[gentoo-commits] proj/portage:master commit in: lib/portage/util/_async/, lib/portage/tests/process/

2023-10-21 Thread Zac Medico
commit: 6abc969109754ab086db2bac5be1029de1a015c3
Author: Zac Medico  gentoo  org>
AuthorDate: Fri Oct 20 04:11:48 2023 +
Commit: Zac Medico  gentoo  org>
CommitDate: Sun Oct 22 04:17:48 2023 +
URL:https://gitweb.gentoo.org/proj/portage.git/commit/?id=6abc9691

ForkProcess: Implement fd_pipes via send_handle

This new fd_pipes implementation is only enabled
when the multiprocessing start method is not fork,
ensuring backward compatibility with existing
ForkProcess callers that rely on the fork start
method.

Note that the new fd_pipes implementation uses a
thread via run_in_executor, and threads are not
recommended for mixing with the fork start method
due to cpython issue 84559.

Bug: https://bugs.gentoo.org/915896
Signed-off-by: Zac Medico  gentoo.org>

 lib/portage/tests/process/test_ForkProcess.py |   7 ++
 lib/portage/util/_async/ForkProcess.py| 142 +-
 2 files changed, 124 insertions(+), 25 deletions(-)

diff --git a/lib/portage/tests/process/test_ForkProcess.py 
b/lib/portage/tests/process/test_ForkProcess.py
index c07c60e9c6..bc0b836f11 100644
--- a/lib/portage/tests/process/test_ForkProcess.py
+++ b/lib/portage/tests/process/test_ForkProcess.py
@@ -4,6 +4,7 @@
 import functools
 import multiprocessing
 import tempfile
+from unittest.mock import patch
 
 from portage import os
 from portage.tests import TestCase
@@ -37,3 +38,9 @@ class ForkProcessTestCase(TestCase):
 
 with open(logfile.name, "rb") as output:
 self.assertEqual(output.read(), test_string.encode("utf-8"))
+
+def test_spawn_logfile_no_send_handle(self):
+with patch(
+"portage.util._async.ForkProcess.ForkProcess._HAVE_SEND_HANDLE", 
new=False
+):
+self.test_spawn_logfile()

diff --git a/lib/portage/util/_async/ForkProcess.py 
b/lib/portage/util/_async/ForkProcess.py
index 09e40a2d3e..6d216a5c43 100644
--- a/lib/portage/util/_async/ForkProcess.py
+++ b/lib/portage/util/_async/ForkProcess.py
@@ -10,6 +10,7 @@ import sys
 
 import portage
 from portage import os
+from portage.cache.mappings import slot_dict_class
 from portage.util.futures import asyncio
 from _emerge.SpawnProcess import SpawnProcess
 
@@ -26,29 +27,36 @@ class ForkProcess(SpawnProcess):
 "_proc_join_task",
 )
 
+_file_names = ("connection", "slave_fd")
+_files_dict = slot_dict_class(_file_names, prefix="")
+
 # Number of seconds between poll attempts for process exit status
 # (after the sentinel has become ready).
 _proc_join_interval = 0.1
 
-def _start(self):
-if self.fd_pipes or self.logfile:
-if self.fd_pipes:
-if multiprocessing.get_start_method() != "fork":
-raise NotImplementedError(
-'fd_pipes only supported with multiprocessing start 
method "fork"'
-)
-super()._start()
-return
+_HAVE_SEND_HANDLE = getattr(multiprocessing.reduction, "HAVE_SEND_HANDLE", 
False)
 
-if self.logfile:
-if multiprocessing.get_start_method() == "fork":
-# Use superclass pty support.
-super()._start()
-return
+def _start(self):
+if multiprocessing.get_start_method() == "fork":
+# Backward compatibility mode.
+super()._start()
+return
+
+# This mode supports multiprocessing start methods
+# other than fork. Note that the fd_pipes implementation
+# uses a thread via run_in_executor, and threads are not
+# recommended for mixing with the fork start method due
+# to cpython issue 84559.
+if self.fd_pipes and not self._HAVE_SEND_HANDLE:
+raise NotImplementedError(
+'fd_pipes only supported with HAVE_SEND_HANDLE or 
multiprocessing start method "fork"'
+)
 
-# Log via multiprocessing.Pipe if necessary.
-pr, pw = multiprocessing.Pipe(duplex=False)
-self._child_connection = pw
+if self.fd_pipes or self.logfile:
+# Log via multiprocessing.Pipe if necessary.
+connection, self._child_connection = multiprocessing.Pipe(
+duplex=self._HAVE_SEND_HANDLE
+)
 
 retval = self._spawn(self.args, fd_pipes=self.fd_pipes)
 
@@ -59,11 +67,71 @@ class ForkProcess(SpawnProcess):
 self._async_waitpid()
 else:
 self._child_connection.close()
+self.fd_pipes = self.fd_pipes or {}
 stdout_fd = None
 if not self.background:
-stdout_fd = os.dup(sys.__stdout__.fileno())
+self.fd_pipes.setdefault(0, portage._get_stdin().fileno())
+self.fd_pipes.setdefault(1, sys.__stdout__.fileno())
+self.fd_pipes.setdefault(2, sys.__stderr__.fileno())
+

[gentoo-commits] proj/portage:master commit in: lib/portage/util/_async/, lib/portage/tests/process/

2020-06-22 Thread Zac Medico
commit: 72ac22e722549833c1ee7e7ad1b585db55f7dafc
Author: Zac Medico  gentoo  org>
AuthorDate: Fri Jun 19 03:04:52 2020 +
Commit: Zac Medico  gentoo  org>
CommitDate: Tue Jun 23 02:13:05 2020 +
URL:https://gitweb.gentoo.org/proj/portage.git/commit/?id=72ac22e7

PipeLogger: non-blocking write to pipe (bug 709746)

Add support to write to a non-blocking pipe instead of a
log file. This is needed for the purposes of bug 709746,
where PipeLogger will write to a pipe that is drained
by anoher PipeLogger instance which is running in the same
process.

Bug: https://bugs.gentoo.org/709746
Reviewed-by: Brian Dolbec  gentoo.org>
Signed-off-by: Zac Medico  gentoo.org>

 lib/portage/tests/process/test_PipeLogger.py | 58 +
 lib/portage/util/_async/PipeLogger.py| 75 +---
 2 files changed, 116 insertions(+), 17 deletions(-)

diff --git a/lib/portage/tests/process/test_PipeLogger.py 
b/lib/portage/tests/process/test_PipeLogger.py
new file mode 100644
index 0..2bd94cf39
--- /dev/null
+++ b/lib/portage/tests/process/test_PipeLogger.py
@@ -0,0 +1,58 @@
+# Copyright 2020 Gentoo Authors
+# Distributed under the terms of the GNU General Public License v2
+
+from portage import os
+from portage.tests import TestCase
+from portage.util._async.PipeLogger import PipeLogger
+from portage.util.futures import asyncio
+from portage.util.futures._asyncio.streams import _reader, _writer
+from portage.util.futures.compat_coroutine import coroutine, coroutine_return
+from portage.util.futures.unix_events import _set_nonblocking
+
+
+class PipeLoggerTestCase(TestCase):
+
+   @coroutine
+   def _testPipeLoggerToPipe(self, test_string, loop=None):
+   """
+   Test PipeLogger writing to a pipe connected to a PipeReader.
+   This verifies that PipeLogger does not deadlock when writing
+   to a pipe that's drained by a PipeReader running in the same
+   process (requires non-blocking write).
+   """
+
+   input_fd, writer_pipe = os.pipe()
+   _set_nonblocking(writer_pipe)
+   writer_pipe = os.fdopen(writer_pipe, 'wb', 0)
+   writer = asyncio.ensure_future(_writer(writer_pipe, 
test_string.encode('ascii'), loop=loop), loop=loop)
+   writer.add_done_callback(lambda writer: writer_pipe.close())
+
+   pr, pw = os.pipe()
+
+   consumer = PipeLogger(background=True,
+   input_fd=input_fd,
+   log_file_path=os.fdopen(pw, 'wb', 0),
+   scheduler=loop)
+   consumer.start()
+
+   # Before starting the reader, wait here for a moment, in order
+   # to exercise PipeLogger's handling of EAGAIN during write.
+   yield asyncio.wait([writer], timeout=0.01)
+
+   reader = _reader(pr, loop=loop)
+   yield writer
+   content = yield reader
+   yield consumer.async_wait()
+
+   self.assertEqual(consumer.returncode, os.EX_OK)
+
+   coroutine_return(content.decode('ascii', 'replace'))
+
+   def testPipeLogger(self):
+   loop = asyncio._wrap_loop()
+
+   for x in (1, 2, 5, 6, 7, 8, 2**5, 2**10, 2**12, 2**13, 2**14, 
2**17, 2**17 + 1):
+   test_string = x * "a"
+   output = 
loop.run_until_complete(self._testPipeLoggerToPipe(test_string, loop=loop))
+   self.assertEqual(test_string, output,
+   "x = %s, len(output) = %s" % (x, len(output)))

diff --git a/lib/portage/util/_async/PipeLogger.py 
b/lib/portage/util/_async/PipeLogger.py
index a4258f350..4271c8ee2 100644
--- a/lib/portage/util/_async/PipeLogger.py
+++ b/lib/portage/util/_async/PipeLogger.py
@@ -1,4 +1,4 @@
-# Copyright 2008-2018 Gentoo Foundation
+# Copyright 2008-2020 Gentoo Authors
 # Distributed under the terms of the GNU General Public License v2
 
 import fcntl
@@ -8,6 +8,10 @@ import sys
 
 import portage
 from portage import os, _encodings, _unicode_encode
+from portage.util.futures import asyncio
+from portage.util.futures._asyncio.streams import _writer
+from portage.util.futures.compat_coroutine import coroutine
+from portage.util.futures.unix_events import _set_nonblocking
 from _emerge.AbstractPollTask import AbstractPollTask
 
 class PipeLogger(AbstractPollTask):
@@ -21,13 +25,16 @@ class PipeLogger(AbstractPollTask):
"""
 
__slots__ = ("input_fd", "log_file_path", "stdout_fd") + \
-   ("_log_file", "_log_file_real")
+   ("_io_loop_task", "_log_file", "_log_file_nb", "_log_file_real")
 
def _start(self):
 
log_file_path = self.log_file_path
-   if log_file_path is not None:
-
+   if hasattr(log_file_path, 'write'):
+   self._log_file_nb = True
+

[gentoo-commits] proj/portage:master commit in: lib/portage/util/_async/, lib/portage/tests/process/

2020-06-15 Thread Zac Medico
commit: ca763549507d995e91a49753b13bcca8748fae6c
Author: Zac Medico  gentoo  org>
AuthorDate: Tue Jun 16 03:14:36 2020 +
Commit: Zac Medico  gentoo  org>
CommitDate: Tue Jun 16 03:14:49 2020 +
URL:https://gitweb.gentoo.org/proj/portage.git/commit/?id=ca763549

Revert "PipeLogger: non-blocking write to pipe (bug 709746)"

This reverts commit 3e46825a047067a96ed997fe394f85e042e542a8.
We've had reports of emerge hangs, so reverting this for now.

Bug: https://bugs.gentoo.org/709746
Signed-off-by: Zac Medico  gentoo.org>

 lib/portage/tests/process/test_PipeLogger.py | 54 
 lib/portage/util/_async/PipeLogger.py| 73 +---
 2 files changed, 13 insertions(+), 114 deletions(-)

diff --git a/lib/portage/tests/process/test_PipeLogger.py 
b/lib/portage/tests/process/test_PipeLogger.py
deleted file mode 100644
index 2b9f10eeb..0
--- a/lib/portage/tests/process/test_PipeLogger.py
+++ /dev/null
@@ -1,54 +0,0 @@
-# Copyright 2020 Gentoo Authors
-# Distributed under the terms of the GNU General Public License v2
-
-from portage import os
-from portage.tests import TestCase
-from portage.util._async.PipeLogger import PipeLogger
-from portage.util.futures import asyncio
-from portage.util.futures._asyncio.streams import _reader, _writer
-from portage.util.futures.compat_coroutine import coroutine, coroutine_return
-
-
-class PipeLoggerTestCase(TestCase):
-
-   @coroutine
-   def _testPipeLoggerToPipe(self, test_string, loop=None):
-   """
-   Test PipeLogger writing to a pipe connected to a PipeReader.
-   This verifies that PipeLogger does not deadlock when writing
-   to a pipe that's drained by a PipeReader running in the same
-   process (requires non-blocking write).
-   """
-
-   pr1, pw1 = os.pipe()
-   writer = asyncio.ensure_future(_writer(pw1, 
test_string.encode('ascii')), loop=loop)
-
-   pr, pw = os.pipe()
-
-   consumer = PipeLogger(background=True,
-   input_fd=pr1,
-   log_file_path=os.fdopen(pw, 'wb', 0),
-   scheduler=loop)
-   consumer.start()
-
-   # Before starting the reader, wait here for a moment, in order
-   # to exercise PipeLogger's handling of EAGAIN during write.
-   yield asyncio.wait([writer], timeout=0.01)
-
-   reader = _reader(pr, loop=loop)
-   yield writer
-   content = yield reader
-   yield consumer.async_wait()
-
-   self.assertEqual(consumer.returncode, os.EX_OK)
-
-   coroutine_return(content.decode('ascii', 'replace'))
-
-   def testPipeLogger(self):
-   loop = asyncio._wrap_loop()
-
-   for x in (1, 2, 5, 6, 7, 8, 2**5, 2**10, 2**12, 2**13, 2**14, 
2**17, 2**17 + 1):
-   test_string = x * "a"
-   output = 
loop.run_until_complete(self._testPipeLoggerToPipe(test_string, loop=loop))
-   self.assertEqual(test_string, output,
-   "x = %s, len(output) = %s" % (x, len(output)))

diff --git a/lib/portage/util/_async/PipeLogger.py 
b/lib/portage/util/_async/PipeLogger.py
index 1776cc860..a4258f350 100644
--- a/lib/portage/util/_async/PipeLogger.py
+++ b/lib/portage/util/_async/PipeLogger.py
@@ -8,9 +8,6 @@ import sys
 
 import portage
 from portage import os, _encodings, _unicode_encode
-from portage.util.futures import asyncio
-from portage.util.futures.compat_coroutine import coroutine
-from portage.util.futures.unix_events import _set_nonblocking
 from _emerge.AbstractPollTask import AbstractPollTask
 
 class PipeLogger(AbstractPollTask):
@@ -24,15 +21,13 @@ class PipeLogger(AbstractPollTask):
"""
 
__slots__ = ("input_fd", "log_file_path", "stdout_fd") + \
-   ("_io_loop_task", "_log_file", "_log_file_real")
+   ("_log_file", "_log_file_real")
 
def _start(self):
 
log_file_path = self.log_file_path
-   if hasattr(log_file_path, 'write'):
-   self._log_file = log_file_path
-   _set_nonblocking(self._log_file.fileno())
-   elif log_file_path is not None:
+   if log_file_path is not None:
+
self._log_file = open(_unicode_encode(log_file_path,
encoding=_encodings['fs'], errors='strict'), 
mode='ab')
if log_file_path.endswith('.gz'):
@@ -62,8 +57,7 @@ class PipeLogger(AbstractPollTask):
fcntl.fcntl(fd, fcntl.F_SETFD,
fcntl.fcntl(fd, fcntl.F_GETFD) | 
fcntl.FD_CLOEXEC)
 
-   self._io_loop_task = asyncio.ensure_future(self._io_loop(fd), 
loop=self.scheduler)
-   

[gentoo-commits] proj/portage:master commit in: lib/portage/util/_async/, lib/portage/tests/process/

2020-06-13 Thread Zac Medico
commit: 3e46825a047067a96ed997fe394f85e042e542a8
Author: Zac Medico  gentoo  org>
AuthorDate: Mon Feb 24 08:06:11 2020 +
Commit: Zac Medico  gentoo  org>
CommitDate: Sat Jun 13 06:30:01 2020 +
URL:https://gitweb.gentoo.org/proj/portage.git/commit/?id=3e46825a

PipeLogger: non-blocking write to pipe (bug 709746)

Add support to write to a non-blocking pipe instead of a
log file. This is needed for the purposes of bug 709746,
where PipeLogger will write to a pipe that is drained
by anoher PipeLogger instance which is running in the same
process.

Bug: https://bugs.gentoo.org/709746
Signed-off-by: Zac Medico  gentoo.org>

 lib/portage/tests/process/test_PipeLogger.py | 54 +++
 lib/portage/util/_async/PipeLogger.py| 66 ++--
 2 files changed, 107 insertions(+), 13 deletions(-)

diff --git a/lib/portage/tests/process/test_PipeLogger.py 
b/lib/portage/tests/process/test_PipeLogger.py
new file mode 100644
index 0..2b9f10eeb
--- /dev/null
+++ b/lib/portage/tests/process/test_PipeLogger.py
@@ -0,0 +1,54 @@
+# Copyright 2020 Gentoo Authors
+# Distributed under the terms of the GNU General Public License v2
+
+from portage import os
+from portage.tests import TestCase
+from portage.util._async.PipeLogger import PipeLogger
+from portage.util.futures import asyncio
+from portage.util.futures._asyncio.streams import _reader, _writer
+from portage.util.futures.compat_coroutine import coroutine, coroutine_return
+
+
+class PipeLoggerTestCase(TestCase):
+
+   @coroutine
+   def _testPipeLoggerToPipe(self, test_string, loop=None):
+   """
+   Test PipeLogger writing to a pipe connected to a PipeReader.
+   This verifies that PipeLogger does not deadlock when writing
+   to a pipe that's drained by a PipeReader running in the same
+   process (requires non-blocking write).
+   """
+
+   pr1, pw1 = os.pipe()
+   writer = asyncio.ensure_future(_writer(pw1, 
test_string.encode('ascii')), loop=loop)
+
+   pr, pw = os.pipe()
+
+   consumer = PipeLogger(background=True,
+   input_fd=pr1,
+   log_file_path=os.fdopen(pw, 'wb', 0),
+   scheduler=loop)
+   consumer.start()
+
+   # Before starting the reader, wait here for a moment, in order
+   # to exercise PipeLogger's handling of EAGAIN during write.
+   yield asyncio.wait([writer], timeout=0.01)
+
+   reader = _reader(pr, loop=loop)
+   yield writer
+   content = yield reader
+   yield consumer.async_wait()
+
+   self.assertEqual(consumer.returncode, os.EX_OK)
+
+   coroutine_return(content.decode('ascii', 'replace'))
+
+   def testPipeLogger(self):
+   loop = asyncio._wrap_loop()
+
+   for x in (1, 2, 5, 6, 7, 8, 2**5, 2**10, 2**12, 2**13, 2**14, 
2**17, 2**17 + 1):
+   test_string = x * "a"
+   output = 
loop.run_until_complete(self._testPipeLoggerToPipe(test_string, loop=loop))
+   self.assertEqual(test_string, output,
+   "x = %s, len(output) = %s" % (x, len(output)))

diff --git a/lib/portage/util/_async/PipeLogger.py 
b/lib/portage/util/_async/PipeLogger.py
index a4258f350..83669e05e 100644
--- a/lib/portage/util/_async/PipeLogger.py
+++ b/lib/portage/util/_async/PipeLogger.py
@@ -8,6 +8,9 @@ import sys
 
 import portage
 from portage import os, _encodings, _unicode_encode
+from portage.util.futures import asyncio
+from portage.util.futures.compat_coroutine import coroutine
+from portage.util.futures.unix_events import _set_nonblocking
 from _emerge.AbstractPollTask import AbstractPollTask
 
 class PipeLogger(AbstractPollTask):
@@ -21,13 +24,15 @@ class PipeLogger(AbstractPollTask):
"""
 
__slots__ = ("input_fd", "log_file_path", "stdout_fd") + \
-   ("_log_file", "_log_file_real")
+   ("_io_loop_task", "_log_file", "_log_file_real")
 
def _start(self):
 
log_file_path = self.log_file_path
-   if log_file_path is not None:
-
+   if hasattr(log_file_path, 'write'):
+   self._log_file = log_file_path
+   _set_nonblocking(self._log_file.fileno())
+   elif log_file_path is not None:
self._log_file = open(_unicode_encode(log_file_path,
encoding=_encodings['fs'], errors='strict'), 
mode='ab')
if log_file_path.endswith('.gz'):
@@ -57,7 +62,8 @@ class PipeLogger(AbstractPollTask):
fcntl.fcntl(fd, fcntl.F_SETFD,
fcntl.fcntl(fd, fcntl.F_GETFD) | 
fcntl.FD_CLOEXEC)
 
-   

[gentoo-commits] proj/portage:master commit in: lib/portage/util/_async/, lib/portage/tests/process/

2020-04-07 Thread Zac Medico
commit: f1e9389d64b6ded41d0dac99979a049cfa27e75c
Author: Zac Medico  gentoo  org>
AuthorDate: Wed Apr  8 05:00:11 2020 +
Commit: Zac Medico  gentoo  org>
CommitDate: Wed Apr  8 05:29:48 2020 +
URL:https://gitweb.gentoo.org/proj/portage.git/commit/?id=f1e9389d

Revert "PipeLogger: non-blocking write to pipe (bug 709746)"

This reverts commit 27712651aa7014a960b012dc89457df09677edc1.

Bug: https://bugs.gentoo.org/716636
Signed-off-by: Zac Medico  gentoo.org>

 lib/portage/tests/process/test_PopenProcess.py | 41 +---
 lib/portage/util/_async/PipeLogger.py  | 67 +-
 2 files changed, 14 insertions(+), 94 deletions(-)

diff --git a/lib/portage/tests/process/test_PopenProcess.py 
b/lib/portage/tests/process/test_PopenProcess.py
index d4e97f210..ed506b814 100644
--- a/lib/portage/tests/process/test_PopenProcess.py
+++ b/lib/portage/tests/process/test_PopenProcess.py
@@ -9,8 +9,6 @@ from portage.tests import TestCase
 from portage.util._async.PipeLogger import PipeLogger
 from portage.util._async.PopenProcess import PopenProcess
 from portage.util._eventloop.global_event_loop import global_event_loop
-from portage.util.futures._asyncio.streams import _reader
-from portage.util.futures.compat_coroutine import coroutine, coroutine_return
 from _emerge.PipeReader import PipeReader
 
 class PopenPipeTestCase(TestCase):
@@ -75,41 +73,8 @@ class PopenPipeTestCase(TestCase):
 
return content.decode('ascii', 'replace')
 
-   @coroutine
-   def _testPipeLoggerToPipe(self, test_string, loop=None):
-   """
-   Test PipeLogger writing to a pipe connected to a PipeReader.
-   This verifies that PipeLogger does not deadlock when writing
-   to a pipe that's drained by a PipeReader running in the same
-   process (requires non-blocking write).
-   """
-
-   producer = PopenProcess(proc=subprocess.Popen(
-   ["bash", "-c", self._echo_cmd % test_string],
-   stdout=subprocess.PIPE, stderr=subprocess.STDOUT),
-   scheduler=loop)
-
-   pr, pw = os.pipe()
-
-   consumer = producer.pipe_reader = PipeLogger(background=True,
-   input_fd=producer.proc.stdout,
-   log_file_path=os.fdopen(pw, 'wb', 0))
-
-   reader = _reader(pr, loop=loop)
-   yield producer.async_start()
-   content = yield reader
-   yield producer.async_wait()
-   yield consumer.async_wait()
-
-   self.assertEqual(producer.returncode, os.EX_OK)
-   self.assertEqual(consumer.returncode, os.EX_OK)
-
-   coroutine_return(content.decode('ascii', 'replace'))
-
def testPopenPipe(self):
-   loop = global_event_loop()
-
-   for x in (1, 2, 5, 6, 7, 8, 2**5, 2**10, 2**12, 2**13, 2**14, 
2**15, 2**16):
+   for x in (1, 2, 5, 6, 7, 8, 2**5, 2**10, 2**12, 2**13, 2**14):
test_string = x * "a"
output = self._testPipeReader(test_string)
self.assertEqual(test_string, output,
@@ -118,7 +83,3 @@ class PopenPipeTestCase(TestCase):
output = self._testPipeLogger(test_string)
self.assertEqual(test_string, output,
"x = %s, len(output) = %s" % (x, len(output)))
-
-   output = 
loop.run_until_complete(self._testPipeLoggerToPipe(test_string, loop=loop))
-   self.assertEqual(test_string, output,
-   "x = %s, len(output) = %s" % (x, len(output)))

diff --git a/lib/portage/util/_async/PipeLogger.py 
b/lib/portage/util/_async/PipeLogger.py
index 6b03988a1..a4258f350 100644
--- a/lib/portage/util/_async/PipeLogger.py
+++ b/lib/portage/util/_async/PipeLogger.py
@@ -8,9 +8,6 @@ import sys
 
 import portage
 from portage import os, _encodings, _unicode_encode
-from portage.util.futures import asyncio
-from portage.util.futures.compat_coroutine import coroutine
-from portage.util.futures.unix_events import _set_nonblocking
 from _emerge.AbstractPollTask import AbstractPollTask
 
 class PipeLogger(AbstractPollTask):
@@ -24,15 +21,13 @@ class PipeLogger(AbstractPollTask):
"""
 
__slots__ = ("input_fd", "log_file_path", "stdout_fd") + \
-   ("_io_loop_task", "_log_file", "_log_file_real")
+   ("_log_file", "_log_file_real")
 
def _start(self):
 
log_file_path = self.log_file_path
-   if hasattr(log_file_path, 'write'):
-   self._log_file = log_file_path
-   _set_nonblocking(self._log_file.fileno())
-   elif log_file_path is not None:
+   if log_file_path is not None:
+
self._log_file = 

[gentoo-commits] proj/portage:master commit in: lib/portage/util/_async/, lib/portage/tests/process/

2020-02-24 Thread Zac Medico
commit: 27712651aa7014a960b012dc89457df09677edc1
Author: Zac Medico  gentoo  org>
AuthorDate: Mon Feb 24 08:06:11 2020 +
Commit: Zac Medico  gentoo  org>
CommitDate: Mon Feb 24 10:26:33 2020 +
URL:https://gitweb.gentoo.org/proj/portage.git/commit/?id=27712651

PipeLogger: non-blocking write to pipe (bug 709746)

Add support to write to a non-blocking pipe instead of a
log file. This is needed for the purposes of bug 709746,
where PipeLogger will write to a pipe that is drained
by anoher PipeLogger instance which is running in the same
process.

Bug: https://bugs.gentoo.org/709746
Signed-off-by: Zac Medico  gentoo.org>

 lib/portage/tests/process/test_PopenProcess.py | 41 +++-
 lib/portage/util/_async/PipeLogger.py  | 67 +-
 2 files changed, 94 insertions(+), 14 deletions(-)

diff --git a/lib/portage/tests/process/test_PopenProcess.py 
b/lib/portage/tests/process/test_PopenProcess.py
index ed506b814..d4e97f210 100644
--- a/lib/portage/tests/process/test_PopenProcess.py
+++ b/lib/portage/tests/process/test_PopenProcess.py
@@ -9,6 +9,8 @@ from portage.tests import TestCase
 from portage.util._async.PipeLogger import PipeLogger
 from portage.util._async.PopenProcess import PopenProcess
 from portage.util._eventloop.global_event_loop import global_event_loop
+from portage.util.futures._asyncio.streams import _reader
+from portage.util.futures.compat_coroutine import coroutine, coroutine_return
 from _emerge.PipeReader import PipeReader
 
 class PopenPipeTestCase(TestCase):
@@ -73,8 +75,41 @@ class PopenPipeTestCase(TestCase):
 
return content.decode('ascii', 'replace')
 
+   @coroutine
+   def _testPipeLoggerToPipe(self, test_string, loop=None):
+   """
+   Test PipeLogger writing to a pipe connected to a PipeReader.
+   This verifies that PipeLogger does not deadlock when writing
+   to a pipe that's drained by a PipeReader running in the same
+   process (requires non-blocking write).
+   """
+
+   producer = PopenProcess(proc=subprocess.Popen(
+   ["bash", "-c", self._echo_cmd % test_string],
+   stdout=subprocess.PIPE, stderr=subprocess.STDOUT),
+   scheduler=loop)
+
+   pr, pw = os.pipe()
+
+   consumer = producer.pipe_reader = PipeLogger(background=True,
+   input_fd=producer.proc.stdout,
+   log_file_path=os.fdopen(pw, 'wb', 0))
+
+   reader = _reader(pr, loop=loop)
+   yield producer.async_start()
+   content = yield reader
+   yield producer.async_wait()
+   yield consumer.async_wait()
+
+   self.assertEqual(producer.returncode, os.EX_OK)
+   self.assertEqual(consumer.returncode, os.EX_OK)
+
+   coroutine_return(content.decode('ascii', 'replace'))
+
def testPopenPipe(self):
-   for x in (1, 2, 5, 6, 7, 8, 2**5, 2**10, 2**12, 2**13, 2**14):
+   loop = global_event_loop()
+
+   for x in (1, 2, 5, 6, 7, 8, 2**5, 2**10, 2**12, 2**13, 2**14, 
2**15, 2**16):
test_string = x * "a"
output = self._testPipeReader(test_string)
self.assertEqual(test_string, output,
@@ -83,3 +118,7 @@ class PopenPipeTestCase(TestCase):
output = self._testPipeLogger(test_string)
self.assertEqual(test_string, output,
"x = %s, len(output) = %s" % (x, len(output)))
+
+   output = 
loop.run_until_complete(self._testPipeLoggerToPipe(test_string, loop=loop))
+   self.assertEqual(test_string, output,
+   "x = %s, len(output) = %s" % (x, len(output)))

diff --git a/lib/portage/util/_async/PipeLogger.py 
b/lib/portage/util/_async/PipeLogger.py
index a4258f350..6b03988a1 100644
--- a/lib/portage/util/_async/PipeLogger.py
+++ b/lib/portage/util/_async/PipeLogger.py
@@ -8,6 +8,9 @@ import sys
 
 import portage
 from portage import os, _encodings, _unicode_encode
+from portage.util.futures import asyncio
+from portage.util.futures.compat_coroutine import coroutine
+from portage.util.futures.unix_events import _set_nonblocking
 from _emerge.AbstractPollTask import AbstractPollTask
 
 class PipeLogger(AbstractPollTask):
@@ -21,13 +24,15 @@ class PipeLogger(AbstractPollTask):
"""
 
__slots__ = ("input_fd", "log_file_path", "stdout_fd") + \
-   ("_log_file", "_log_file_real")
+   ("_io_loop_task", "_log_file", "_log_file_real")
 
def _start(self):
 
log_file_path = self.log_file_path
-   if log_file_path is not None:
-
+   if hasattr(log_file_path, 'write'):
+   self._log_file =