[gentoo-commits] proj/portage:master commit in: lib/portage/util/_async/, lib/portage/tests/process/
commit: 6abc969109754ab086db2bac5be1029de1a015c3 Author: Zac Medico gentoo org> AuthorDate: Fri Oct 20 04:11:48 2023 + Commit: Zac Medico gentoo org> CommitDate: Sun Oct 22 04:17:48 2023 + URL:https://gitweb.gentoo.org/proj/portage.git/commit/?id=6abc9691 ForkProcess: Implement fd_pipes via send_handle This new fd_pipes implementation is only enabled when the multiprocessing start method is not fork, ensuring backward compatibility with existing ForkProcess callers that rely on the fork start method. Note that the new fd_pipes implementation uses a thread via run_in_executor, and threads are not recommended for mixing with the fork start method due to cpython issue 84559. Bug: https://bugs.gentoo.org/915896 Signed-off-by: Zac Medico gentoo.org> lib/portage/tests/process/test_ForkProcess.py | 7 ++ lib/portage/util/_async/ForkProcess.py| 142 +- 2 files changed, 124 insertions(+), 25 deletions(-) diff --git a/lib/portage/tests/process/test_ForkProcess.py b/lib/portage/tests/process/test_ForkProcess.py index c07c60e9c6..bc0b836f11 100644 --- a/lib/portage/tests/process/test_ForkProcess.py +++ b/lib/portage/tests/process/test_ForkProcess.py @@ -4,6 +4,7 @@ import functools import multiprocessing import tempfile +from unittest.mock import patch from portage import os from portage.tests import TestCase @@ -37,3 +38,9 @@ class ForkProcessTestCase(TestCase): with open(logfile.name, "rb") as output: self.assertEqual(output.read(), test_string.encode("utf-8")) + +def test_spawn_logfile_no_send_handle(self): +with patch( +"portage.util._async.ForkProcess.ForkProcess._HAVE_SEND_HANDLE", new=False +): +self.test_spawn_logfile() diff --git a/lib/portage/util/_async/ForkProcess.py b/lib/portage/util/_async/ForkProcess.py index 09e40a2d3e..6d216a5c43 100644 --- a/lib/portage/util/_async/ForkProcess.py +++ b/lib/portage/util/_async/ForkProcess.py @@ -10,6 +10,7 @@ import sys import portage from portage import os +from portage.cache.mappings import slot_dict_class from portage.util.futures import asyncio from _emerge.SpawnProcess import SpawnProcess @@ -26,29 +27,36 @@ class ForkProcess(SpawnProcess): "_proc_join_task", ) +_file_names = ("connection", "slave_fd") +_files_dict = slot_dict_class(_file_names, prefix="") + # Number of seconds between poll attempts for process exit status # (after the sentinel has become ready). _proc_join_interval = 0.1 -def _start(self): -if self.fd_pipes or self.logfile: -if self.fd_pipes: -if multiprocessing.get_start_method() != "fork": -raise NotImplementedError( -'fd_pipes only supported with multiprocessing start method "fork"' -) -super()._start() -return +_HAVE_SEND_HANDLE = getattr(multiprocessing.reduction, "HAVE_SEND_HANDLE", False) -if self.logfile: -if multiprocessing.get_start_method() == "fork": -# Use superclass pty support. -super()._start() -return +def _start(self): +if multiprocessing.get_start_method() == "fork": +# Backward compatibility mode. +super()._start() +return + +# This mode supports multiprocessing start methods +# other than fork. Note that the fd_pipes implementation +# uses a thread via run_in_executor, and threads are not +# recommended for mixing with the fork start method due +# to cpython issue 84559. +if self.fd_pipes and not self._HAVE_SEND_HANDLE: +raise NotImplementedError( +'fd_pipes only supported with HAVE_SEND_HANDLE or multiprocessing start method "fork"' +) -# Log via multiprocessing.Pipe if necessary. -pr, pw = multiprocessing.Pipe(duplex=False) -self._child_connection = pw +if self.fd_pipes or self.logfile: +# Log via multiprocessing.Pipe if necessary. +connection, self._child_connection = multiprocessing.Pipe( +duplex=self._HAVE_SEND_HANDLE +) retval = self._spawn(self.args, fd_pipes=self.fd_pipes) @@ -59,11 +67,71 @@ class ForkProcess(SpawnProcess): self._async_waitpid() else: self._child_connection.close() +self.fd_pipes = self.fd_pipes or {} stdout_fd = None if not self.background: -stdout_fd = os.dup(sys.__stdout__.fileno()) +self.fd_pipes.setdefault(0, portage._get_stdin().fileno()) +self.fd_pipes.setdefault(1, sys.__stdout__.fileno()) +self.fd_pipes.setdefault(2, sys.__stderr__.fileno()) +
[gentoo-commits] proj/portage:master commit in: lib/portage/util/_async/, lib/portage/tests/process/
commit: 72ac22e722549833c1ee7e7ad1b585db55f7dafc Author: Zac Medico gentoo org> AuthorDate: Fri Jun 19 03:04:52 2020 + Commit: Zac Medico gentoo org> CommitDate: Tue Jun 23 02:13:05 2020 + URL:https://gitweb.gentoo.org/proj/portage.git/commit/?id=72ac22e7 PipeLogger: non-blocking write to pipe (bug 709746) Add support to write to a non-blocking pipe instead of a log file. This is needed for the purposes of bug 709746, where PipeLogger will write to a pipe that is drained by anoher PipeLogger instance which is running in the same process. Bug: https://bugs.gentoo.org/709746 Reviewed-by: Brian Dolbec gentoo.org> Signed-off-by: Zac Medico gentoo.org> lib/portage/tests/process/test_PipeLogger.py | 58 + lib/portage/util/_async/PipeLogger.py| 75 +--- 2 files changed, 116 insertions(+), 17 deletions(-) diff --git a/lib/portage/tests/process/test_PipeLogger.py b/lib/portage/tests/process/test_PipeLogger.py new file mode 100644 index 0..2bd94cf39 --- /dev/null +++ b/lib/portage/tests/process/test_PipeLogger.py @@ -0,0 +1,58 @@ +# Copyright 2020 Gentoo Authors +# Distributed under the terms of the GNU General Public License v2 + +from portage import os +from portage.tests import TestCase +from portage.util._async.PipeLogger import PipeLogger +from portage.util.futures import asyncio +from portage.util.futures._asyncio.streams import _reader, _writer +from portage.util.futures.compat_coroutine import coroutine, coroutine_return +from portage.util.futures.unix_events import _set_nonblocking + + +class PipeLoggerTestCase(TestCase): + + @coroutine + def _testPipeLoggerToPipe(self, test_string, loop=None): + """ + Test PipeLogger writing to a pipe connected to a PipeReader. + This verifies that PipeLogger does not deadlock when writing + to a pipe that's drained by a PipeReader running in the same + process (requires non-blocking write). + """ + + input_fd, writer_pipe = os.pipe() + _set_nonblocking(writer_pipe) + writer_pipe = os.fdopen(writer_pipe, 'wb', 0) + writer = asyncio.ensure_future(_writer(writer_pipe, test_string.encode('ascii'), loop=loop), loop=loop) + writer.add_done_callback(lambda writer: writer_pipe.close()) + + pr, pw = os.pipe() + + consumer = PipeLogger(background=True, + input_fd=input_fd, + log_file_path=os.fdopen(pw, 'wb', 0), + scheduler=loop) + consumer.start() + + # Before starting the reader, wait here for a moment, in order + # to exercise PipeLogger's handling of EAGAIN during write. + yield asyncio.wait([writer], timeout=0.01) + + reader = _reader(pr, loop=loop) + yield writer + content = yield reader + yield consumer.async_wait() + + self.assertEqual(consumer.returncode, os.EX_OK) + + coroutine_return(content.decode('ascii', 'replace')) + + def testPipeLogger(self): + loop = asyncio._wrap_loop() + + for x in (1, 2, 5, 6, 7, 8, 2**5, 2**10, 2**12, 2**13, 2**14, 2**17, 2**17 + 1): + test_string = x * "a" + output = loop.run_until_complete(self._testPipeLoggerToPipe(test_string, loop=loop)) + self.assertEqual(test_string, output, + "x = %s, len(output) = %s" % (x, len(output))) diff --git a/lib/portage/util/_async/PipeLogger.py b/lib/portage/util/_async/PipeLogger.py index a4258f350..4271c8ee2 100644 --- a/lib/portage/util/_async/PipeLogger.py +++ b/lib/portage/util/_async/PipeLogger.py @@ -1,4 +1,4 @@ -# Copyright 2008-2018 Gentoo Foundation +# Copyright 2008-2020 Gentoo Authors # Distributed under the terms of the GNU General Public License v2 import fcntl @@ -8,6 +8,10 @@ import sys import portage from portage import os, _encodings, _unicode_encode +from portage.util.futures import asyncio +from portage.util.futures._asyncio.streams import _writer +from portage.util.futures.compat_coroutine import coroutine +from portage.util.futures.unix_events import _set_nonblocking from _emerge.AbstractPollTask import AbstractPollTask class PipeLogger(AbstractPollTask): @@ -21,13 +25,16 @@ class PipeLogger(AbstractPollTask): """ __slots__ = ("input_fd", "log_file_path", "stdout_fd") + \ - ("_log_file", "_log_file_real") + ("_io_loop_task", "_log_file", "_log_file_nb", "_log_file_real") def _start(self): log_file_path = self.log_file_path - if log_file_path is not None: - + if hasattr(log_file_path, 'write'): + self._log_file_nb = True +
[gentoo-commits] proj/portage:master commit in: lib/portage/util/_async/, lib/portage/tests/process/
commit: ca763549507d995e91a49753b13bcca8748fae6c Author: Zac Medico gentoo org> AuthorDate: Tue Jun 16 03:14:36 2020 + Commit: Zac Medico gentoo org> CommitDate: Tue Jun 16 03:14:49 2020 + URL:https://gitweb.gentoo.org/proj/portage.git/commit/?id=ca763549 Revert "PipeLogger: non-blocking write to pipe (bug 709746)" This reverts commit 3e46825a047067a96ed997fe394f85e042e542a8. We've had reports of emerge hangs, so reverting this for now. Bug: https://bugs.gentoo.org/709746 Signed-off-by: Zac Medico gentoo.org> lib/portage/tests/process/test_PipeLogger.py | 54 lib/portage/util/_async/PipeLogger.py| 73 +--- 2 files changed, 13 insertions(+), 114 deletions(-) diff --git a/lib/portage/tests/process/test_PipeLogger.py b/lib/portage/tests/process/test_PipeLogger.py deleted file mode 100644 index 2b9f10eeb..0 --- a/lib/portage/tests/process/test_PipeLogger.py +++ /dev/null @@ -1,54 +0,0 @@ -# Copyright 2020 Gentoo Authors -# Distributed under the terms of the GNU General Public License v2 - -from portage import os -from portage.tests import TestCase -from portage.util._async.PipeLogger import PipeLogger -from portage.util.futures import asyncio -from portage.util.futures._asyncio.streams import _reader, _writer -from portage.util.futures.compat_coroutine import coroutine, coroutine_return - - -class PipeLoggerTestCase(TestCase): - - @coroutine - def _testPipeLoggerToPipe(self, test_string, loop=None): - """ - Test PipeLogger writing to a pipe connected to a PipeReader. - This verifies that PipeLogger does not deadlock when writing - to a pipe that's drained by a PipeReader running in the same - process (requires non-blocking write). - """ - - pr1, pw1 = os.pipe() - writer = asyncio.ensure_future(_writer(pw1, test_string.encode('ascii')), loop=loop) - - pr, pw = os.pipe() - - consumer = PipeLogger(background=True, - input_fd=pr1, - log_file_path=os.fdopen(pw, 'wb', 0), - scheduler=loop) - consumer.start() - - # Before starting the reader, wait here for a moment, in order - # to exercise PipeLogger's handling of EAGAIN during write. - yield asyncio.wait([writer], timeout=0.01) - - reader = _reader(pr, loop=loop) - yield writer - content = yield reader - yield consumer.async_wait() - - self.assertEqual(consumer.returncode, os.EX_OK) - - coroutine_return(content.decode('ascii', 'replace')) - - def testPipeLogger(self): - loop = asyncio._wrap_loop() - - for x in (1, 2, 5, 6, 7, 8, 2**5, 2**10, 2**12, 2**13, 2**14, 2**17, 2**17 + 1): - test_string = x * "a" - output = loop.run_until_complete(self._testPipeLoggerToPipe(test_string, loop=loop)) - self.assertEqual(test_string, output, - "x = %s, len(output) = %s" % (x, len(output))) diff --git a/lib/portage/util/_async/PipeLogger.py b/lib/portage/util/_async/PipeLogger.py index 1776cc860..a4258f350 100644 --- a/lib/portage/util/_async/PipeLogger.py +++ b/lib/portage/util/_async/PipeLogger.py @@ -8,9 +8,6 @@ import sys import portage from portage import os, _encodings, _unicode_encode -from portage.util.futures import asyncio -from portage.util.futures.compat_coroutine import coroutine -from portage.util.futures.unix_events import _set_nonblocking from _emerge.AbstractPollTask import AbstractPollTask class PipeLogger(AbstractPollTask): @@ -24,15 +21,13 @@ class PipeLogger(AbstractPollTask): """ __slots__ = ("input_fd", "log_file_path", "stdout_fd") + \ - ("_io_loop_task", "_log_file", "_log_file_real") + ("_log_file", "_log_file_real") def _start(self): log_file_path = self.log_file_path - if hasattr(log_file_path, 'write'): - self._log_file = log_file_path - _set_nonblocking(self._log_file.fileno()) - elif log_file_path is not None: + if log_file_path is not None: + self._log_file = open(_unicode_encode(log_file_path, encoding=_encodings['fs'], errors='strict'), mode='ab') if log_file_path.endswith('.gz'): @@ -62,8 +57,7 @@ class PipeLogger(AbstractPollTask): fcntl.fcntl(fd, fcntl.F_SETFD, fcntl.fcntl(fd, fcntl.F_GETFD) | fcntl.FD_CLOEXEC) - self._io_loop_task = asyncio.ensure_future(self._io_loop(fd), loop=self.scheduler) -
[gentoo-commits] proj/portage:master commit in: lib/portage/util/_async/, lib/portage/tests/process/
commit: 3e46825a047067a96ed997fe394f85e042e542a8 Author: Zac Medico gentoo org> AuthorDate: Mon Feb 24 08:06:11 2020 + Commit: Zac Medico gentoo org> CommitDate: Sat Jun 13 06:30:01 2020 + URL:https://gitweb.gentoo.org/proj/portage.git/commit/?id=3e46825a PipeLogger: non-blocking write to pipe (bug 709746) Add support to write to a non-blocking pipe instead of a log file. This is needed for the purposes of bug 709746, where PipeLogger will write to a pipe that is drained by anoher PipeLogger instance which is running in the same process. Bug: https://bugs.gentoo.org/709746 Signed-off-by: Zac Medico gentoo.org> lib/portage/tests/process/test_PipeLogger.py | 54 +++ lib/portage/util/_async/PipeLogger.py| 66 ++-- 2 files changed, 107 insertions(+), 13 deletions(-) diff --git a/lib/portage/tests/process/test_PipeLogger.py b/lib/portage/tests/process/test_PipeLogger.py new file mode 100644 index 0..2b9f10eeb --- /dev/null +++ b/lib/portage/tests/process/test_PipeLogger.py @@ -0,0 +1,54 @@ +# Copyright 2020 Gentoo Authors +# Distributed under the terms of the GNU General Public License v2 + +from portage import os +from portage.tests import TestCase +from portage.util._async.PipeLogger import PipeLogger +from portage.util.futures import asyncio +from portage.util.futures._asyncio.streams import _reader, _writer +from portage.util.futures.compat_coroutine import coroutine, coroutine_return + + +class PipeLoggerTestCase(TestCase): + + @coroutine + def _testPipeLoggerToPipe(self, test_string, loop=None): + """ + Test PipeLogger writing to a pipe connected to a PipeReader. + This verifies that PipeLogger does not deadlock when writing + to a pipe that's drained by a PipeReader running in the same + process (requires non-blocking write). + """ + + pr1, pw1 = os.pipe() + writer = asyncio.ensure_future(_writer(pw1, test_string.encode('ascii')), loop=loop) + + pr, pw = os.pipe() + + consumer = PipeLogger(background=True, + input_fd=pr1, + log_file_path=os.fdopen(pw, 'wb', 0), + scheduler=loop) + consumer.start() + + # Before starting the reader, wait here for a moment, in order + # to exercise PipeLogger's handling of EAGAIN during write. + yield asyncio.wait([writer], timeout=0.01) + + reader = _reader(pr, loop=loop) + yield writer + content = yield reader + yield consumer.async_wait() + + self.assertEqual(consumer.returncode, os.EX_OK) + + coroutine_return(content.decode('ascii', 'replace')) + + def testPipeLogger(self): + loop = asyncio._wrap_loop() + + for x in (1, 2, 5, 6, 7, 8, 2**5, 2**10, 2**12, 2**13, 2**14, 2**17, 2**17 + 1): + test_string = x * "a" + output = loop.run_until_complete(self._testPipeLoggerToPipe(test_string, loop=loop)) + self.assertEqual(test_string, output, + "x = %s, len(output) = %s" % (x, len(output))) diff --git a/lib/portage/util/_async/PipeLogger.py b/lib/portage/util/_async/PipeLogger.py index a4258f350..83669e05e 100644 --- a/lib/portage/util/_async/PipeLogger.py +++ b/lib/portage/util/_async/PipeLogger.py @@ -8,6 +8,9 @@ import sys import portage from portage import os, _encodings, _unicode_encode +from portage.util.futures import asyncio +from portage.util.futures.compat_coroutine import coroutine +from portage.util.futures.unix_events import _set_nonblocking from _emerge.AbstractPollTask import AbstractPollTask class PipeLogger(AbstractPollTask): @@ -21,13 +24,15 @@ class PipeLogger(AbstractPollTask): """ __slots__ = ("input_fd", "log_file_path", "stdout_fd") + \ - ("_log_file", "_log_file_real") + ("_io_loop_task", "_log_file", "_log_file_real") def _start(self): log_file_path = self.log_file_path - if log_file_path is not None: - + if hasattr(log_file_path, 'write'): + self._log_file = log_file_path + _set_nonblocking(self._log_file.fileno()) + elif log_file_path is not None: self._log_file = open(_unicode_encode(log_file_path, encoding=_encodings['fs'], errors='strict'), mode='ab') if log_file_path.endswith('.gz'): @@ -57,7 +62,8 @@ class PipeLogger(AbstractPollTask): fcntl.fcntl(fd, fcntl.F_SETFD, fcntl.fcntl(fd, fcntl.F_GETFD) | fcntl.FD_CLOEXEC) -
[gentoo-commits] proj/portage:master commit in: lib/portage/util/_async/, lib/portage/tests/process/
commit: f1e9389d64b6ded41d0dac99979a049cfa27e75c Author: Zac Medico gentoo org> AuthorDate: Wed Apr 8 05:00:11 2020 + Commit: Zac Medico gentoo org> CommitDate: Wed Apr 8 05:29:48 2020 + URL:https://gitweb.gentoo.org/proj/portage.git/commit/?id=f1e9389d Revert "PipeLogger: non-blocking write to pipe (bug 709746)" This reverts commit 27712651aa7014a960b012dc89457df09677edc1. Bug: https://bugs.gentoo.org/716636 Signed-off-by: Zac Medico gentoo.org> lib/portage/tests/process/test_PopenProcess.py | 41 +--- lib/portage/util/_async/PipeLogger.py | 67 +- 2 files changed, 14 insertions(+), 94 deletions(-) diff --git a/lib/portage/tests/process/test_PopenProcess.py b/lib/portage/tests/process/test_PopenProcess.py index d4e97f210..ed506b814 100644 --- a/lib/portage/tests/process/test_PopenProcess.py +++ b/lib/portage/tests/process/test_PopenProcess.py @@ -9,8 +9,6 @@ from portage.tests import TestCase from portage.util._async.PipeLogger import PipeLogger from portage.util._async.PopenProcess import PopenProcess from portage.util._eventloop.global_event_loop import global_event_loop -from portage.util.futures._asyncio.streams import _reader -from portage.util.futures.compat_coroutine import coroutine, coroutine_return from _emerge.PipeReader import PipeReader class PopenPipeTestCase(TestCase): @@ -75,41 +73,8 @@ class PopenPipeTestCase(TestCase): return content.decode('ascii', 'replace') - @coroutine - def _testPipeLoggerToPipe(self, test_string, loop=None): - """ - Test PipeLogger writing to a pipe connected to a PipeReader. - This verifies that PipeLogger does not deadlock when writing - to a pipe that's drained by a PipeReader running in the same - process (requires non-blocking write). - """ - - producer = PopenProcess(proc=subprocess.Popen( - ["bash", "-c", self._echo_cmd % test_string], - stdout=subprocess.PIPE, stderr=subprocess.STDOUT), - scheduler=loop) - - pr, pw = os.pipe() - - consumer = producer.pipe_reader = PipeLogger(background=True, - input_fd=producer.proc.stdout, - log_file_path=os.fdopen(pw, 'wb', 0)) - - reader = _reader(pr, loop=loop) - yield producer.async_start() - content = yield reader - yield producer.async_wait() - yield consumer.async_wait() - - self.assertEqual(producer.returncode, os.EX_OK) - self.assertEqual(consumer.returncode, os.EX_OK) - - coroutine_return(content.decode('ascii', 'replace')) - def testPopenPipe(self): - loop = global_event_loop() - - for x in (1, 2, 5, 6, 7, 8, 2**5, 2**10, 2**12, 2**13, 2**14, 2**15, 2**16): + for x in (1, 2, 5, 6, 7, 8, 2**5, 2**10, 2**12, 2**13, 2**14): test_string = x * "a" output = self._testPipeReader(test_string) self.assertEqual(test_string, output, @@ -118,7 +83,3 @@ class PopenPipeTestCase(TestCase): output = self._testPipeLogger(test_string) self.assertEqual(test_string, output, "x = %s, len(output) = %s" % (x, len(output))) - - output = loop.run_until_complete(self._testPipeLoggerToPipe(test_string, loop=loop)) - self.assertEqual(test_string, output, - "x = %s, len(output) = %s" % (x, len(output))) diff --git a/lib/portage/util/_async/PipeLogger.py b/lib/portage/util/_async/PipeLogger.py index 6b03988a1..a4258f350 100644 --- a/lib/portage/util/_async/PipeLogger.py +++ b/lib/portage/util/_async/PipeLogger.py @@ -8,9 +8,6 @@ import sys import portage from portage import os, _encodings, _unicode_encode -from portage.util.futures import asyncio -from portage.util.futures.compat_coroutine import coroutine -from portage.util.futures.unix_events import _set_nonblocking from _emerge.AbstractPollTask import AbstractPollTask class PipeLogger(AbstractPollTask): @@ -24,15 +21,13 @@ class PipeLogger(AbstractPollTask): """ __slots__ = ("input_fd", "log_file_path", "stdout_fd") + \ - ("_io_loop_task", "_log_file", "_log_file_real") + ("_log_file", "_log_file_real") def _start(self): log_file_path = self.log_file_path - if hasattr(log_file_path, 'write'): - self._log_file = log_file_path - _set_nonblocking(self._log_file.fileno()) - elif log_file_path is not None: + if log_file_path is not None: + self._log_file =
[gentoo-commits] proj/portage:master commit in: lib/portage/util/_async/, lib/portage/tests/process/
commit: 27712651aa7014a960b012dc89457df09677edc1 Author: Zac Medico gentoo org> AuthorDate: Mon Feb 24 08:06:11 2020 + Commit: Zac Medico gentoo org> CommitDate: Mon Feb 24 10:26:33 2020 + URL:https://gitweb.gentoo.org/proj/portage.git/commit/?id=27712651 PipeLogger: non-blocking write to pipe (bug 709746) Add support to write to a non-blocking pipe instead of a log file. This is needed for the purposes of bug 709746, where PipeLogger will write to a pipe that is drained by anoher PipeLogger instance which is running in the same process. Bug: https://bugs.gentoo.org/709746 Signed-off-by: Zac Medico gentoo.org> lib/portage/tests/process/test_PopenProcess.py | 41 +++- lib/portage/util/_async/PipeLogger.py | 67 +- 2 files changed, 94 insertions(+), 14 deletions(-) diff --git a/lib/portage/tests/process/test_PopenProcess.py b/lib/portage/tests/process/test_PopenProcess.py index ed506b814..d4e97f210 100644 --- a/lib/portage/tests/process/test_PopenProcess.py +++ b/lib/portage/tests/process/test_PopenProcess.py @@ -9,6 +9,8 @@ from portage.tests import TestCase from portage.util._async.PipeLogger import PipeLogger from portage.util._async.PopenProcess import PopenProcess from portage.util._eventloop.global_event_loop import global_event_loop +from portage.util.futures._asyncio.streams import _reader +from portage.util.futures.compat_coroutine import coroutine, coroutine_return from _emerge.PipeReader import PipeReader class PopenPipeTestCase(TestCase): @@ -73,8 +75,41 @@ class PopenPipeTestCase(TestCase): return content.decode('ascii', 'replace') + @coroutine + def _testPipeLoggerToPipe(self, test_string, loop=None): + """ + Test PipeLogger writing to a pipe connected to a PipeReader. + This verifies that PipeLogger does not deadlock when writing + to a pipe that's drained by a PipeReader running in the same + process (requires non-blocking write). + """ + + producer = PopenProcess(proc=subprocess.Popen( + ["bash", "-c", self._echo_cmd % test_string], + stdout=subprocess.PIPE, stderr=subprocess.STDOUT), + scheduler=loop) + + pr, pw = os.pipe() + + consumer = producer.pipe_reader = PipeLogger(background=True, + input_fd=producer.proc.stdout, + log_file_path=os.fdopen(pw, 'wb', 0)) + + reader = _reader(pr, loop=loop) + yield producer.async_start() + content = yield reader + yield producer.async_wait() + yield consumer.async_wait() + + self.assertEqual(producer.returncode, os.EX_OK) + self.assertEqual(consumer.returncode, os.EX_OK) + + coroutine_return(content.decode('ascii', 'replace')) + def testPopenPipe(self): - for x in (1, 2, 5, 6, 7, 8, 2**5, 2**10, 2**12, 2**13, 2**14): + loop = global_event_loop() + + for x in (1, 2, 5, 6, 7, 8, 2**5, 2**10, 2**12, 2**13, 2**14, 2**15, 2**16): test_string = x * "a" output = self._testPipeReader(test_string) self.assertEqual(test_string, output, @@ -83,3 +118,7 @@ class PopenPipeTestCase(TestCase): output = self._testPipeLogger(test_string) self.assertEqual(test_string, output, "x = %s, len(output) = %s" % (x, len(output))) + + output = loop.run_until_complete(self._testPipeLoggerToPipe(test_string, loop=loop)) + self.assertEqual(test_string, output, + "x = %s, len(output) = %s" % (x, len(output))) diff --git a/lib/portage/util/_async/PipeLogger.py b/lib/portage/util/_async/PipeLogger.py index a4258f350..6b03988a1 100644 --- a/lib/portage/util/_async/PipeLogger.py +++ b/lib/portage/util/_async/PipeLogger.py @@ -8,6 +8,9 @@ import sys import portage from portage import os, _encodings, _unicode_encode +from portage.util.futures import asyncio +from portage.util.futures.compat_coroutine import coroutine +from portage.util.futures.unix_events import _set_nonblocking from _emerge.AbstractPollTask import AbstractPollTask class PipeLogger(AbstractPollTask): @@ -21,13 +24,15 @@ class PipeLogger(AbstractPollTask): """ __slots__ = ("input_fd", "log_file_path", "stdout_fd") + \ - ("_log_file", "_log_file_real") + ("_io_loop_task", "_log_file", "_log_file_real") def _start(self): log_file_path = self.log_file_path - if log_file_path is not None: - + if hasattr(log_file_path, 'write'): + self._log_file =