Re: [gentoo-portage-dev] [PATCH 1/2] PipeLogger: non-blocking write to pipe (bug 709746)
On Fri, 19 Jun 2020 13:39:18 -0700 Zac Medico wrote: > Add support to write to a non-blocking pipe instead of a > log file. This is needed for the purposes of bug 709746, > where PipeLogger will write to a pipe that is drained > by anoher PipeLogger instance which is running in the same > process. > > Bug: https://bugs.gentoo.org/709746 > Signed-off-by: Zac Medico > --- > lib/portage/tests/process/test_PipeLogger.py | 58 > lib/portage/util/_async/PipeLogger.py| 73 > +++- 2 files changed, 115 insertions(+), 16 > deletions(-) create mode 100644 > lib/portage/tests/process/test_PipeLogger.py > > diff --git a/lib/portage/tests/process/test_PipeLogger.py > b/lib/portage/tests/process/test_PipeLogger.py new file mode 100644 > index 0..2bd94cf39 > --- /dev/null > +++ b/lib/portage/tests/process/test_PipeLogger.py > @@ -0,0 +1,58 @@ > +# Copyright 2020 Gentoo Authors > +# Distributed under the terms of the GNU General Public License v2 > + > +from portage import os > +from portage.tests import TestCase > +from portage.util._async.PipeLogger import PipeLogger > +from portage.util.futures import asyncio > +from portage.util.futures._asyncio.streams import _reader, _writer > +from portage.util.futures.compat_coroutine import coroutine, > coroutine_return +from portage.util.futures.unix_events import > _set_nonblocking + > + > +class PipeLoggerTestCase(TestCase): > + > + @coroutine > + def _testPipeLoggerToPipe(self, test_string, loop=None): > + """ > + Test PipeLogger writing to a pipe connected to a > PipeReader. > + This verifies that PipeLogger does not deadlock when > writing > + to a pipe that's drained by a PipeReader running in > the same > + process (requires non-blocking write). > + """ > + > + input_fd, writer_pipe = os.pipe() > + _set_nonblocking(writer_pipe) > + writer_pipe = os.fdopen(writer_pipe, 'wb', 0) > + writer = asyncio.ensure_future(_writer(writer_pipe, > test_string.encode('ascii'), loop=loop), loop=loop) > + writer.add_done_callback(lambda writer: > writer_pipe.close()) + > + pr, pw = os.pipe() > + > + consumer = PipeLogger(background=True, > + input_fd=input_fd, > + log_file_path=os.fdopen(pw, 'wb', 0), > + scheduler=loop) > + consumer.start() > + > + # Before starting the reader, wait here for a > moment, in order > + # to exercise PipeLogger's handling of EAGAIN during > write. > + yield asyncio.wait([writer], timeout=0.01) > + > + reader = _reader(pr, loop=loop) > + yield writer > + content = yield reader > + yield consumer.async_wait() > + > + self.assertEqual(consumer.returncode, os.EX_OK) > + > + coroutine_return(content.decode('ascii', 'replace')) > + > + def testPipeLogger(self): > + loop = asyncio._wrap_loop() > + > + for x in (1, 2, 5, 6, 7, 8, 2**5, 2**10, 2**12, > 2**13, 2**14, 2**17, 2**17 + 1): > + test_string = x * "a" > + output = > loop.run_until_complete(self._testPipeLoggerToPipe(test_string, > loop=loop)) > + self.assertEqual(test_string, output, > + "x = %s, len(output) = %s" % (x, > len(output))) diff --git a/lib/portage/util/_async/PipeLogger.py > b/lib/portage/util/_async/PipeLogger.py index a4258f350..ce8afb846 > 100644 --- a/lib/portage/util/_async/PipeLogger.py > +++ b/lib/portage/util/_async/PipeLogger.py > @@ -8,6 +8,10 @@ import sys > > import portage > from portage import os, _encodings, _unicode_encode > +from portage.util.futures import asyncio > +from portage.util.futures._asyncio.streams import _writer > +from portage.util.futures.compat_coroutine import coroutine > +from portage.util.futures.unix_events import _set_nonblocking > from _emerge.AbstractPollTask import AbstractPollTask > > class PipeLogger(AbstractPollTask): > @@ -21,13 +25,16 @@ class PipeLogger(AbstractPollTask): > """ > > __slots__ = ("input_fd", "log_file_path", "stdout_fd") + \ > - ("_log_file", "_log_file_real") > + ("_io_loop_task", "_log_file", "_log_file_nb", > "_log_file_real") > def _start(self): > > log_file_path = self.log_file_path > - if log_file_path is not None: > - > + if hasattr(log_file_path, 'write'): > + self._log_file_nb = True > + self._log_file = log_file_path > + _set_nonblocking(self._log_file.fileno()) > + elif log_file_path is not None: > self._log_file = > open(_unicode_encode(log_file_path, encoding=_encodings['fs'], > errors='strict'), mode='ab') if
[gentoo-portage-dev] [PATCH 1/2] PipeLogger: non-blocking write to pipe (bug 709746)
Add support to write to a non-blocking pipe instead of a log file. This is needed for the purposes of bug 709746, where PipeLogger will write to a pipe that is drained by anoher PipeLogger instance which is running in the same process. Bug: https://bugs.gentoo.org/709746 Signed-off-by: Zac Medico --- lib/portage/tests/process/test_PipeLogger.py | 58 lib/portage/util/_async/PipeLogger.py| 73 +++- 2 files changed, 115 insertions(+), 16 deletions(-) create mode 100644 lib/portage/tests/process/test_PipeLogger.py diff --git a/lib/portage/tests/process/test_PipeLogger.py b/lib/portage/tests/process/test_PipeLogger.py new file mode 100644 index 0..2bd94cf39 --- /dev/null +++ b/lib/portage/tests/process/test_PipeLogger.py @@ -0,0 +1,58 @@ +# Copyright 2020 Gentoo Authors +# Distributed under the terms of the GNU General Public License v2 + +from portage import os +from portage.tests import TestCase +from portage.util._async.PipeLogger import PipeLogger +from portage.util.futures import asyncio +from portage.util.futures._asyncio.streams import _reader, _writer +from portage.util.futures.compat_coroutine import coroutine, coroutine_return +from portage.util.futures.unix_events import _set_nonblocking + + +class PipeLoggerTestCase(TestCase): + + @coroutine + def _testPipeLoggerToPipe(self, test_string, loop=None): + """ + Test PipeLogger writing to a pipe connected to a PipeReader. + This verifies that PipeLogger does not deadlock when writing + to a pipe that's drained by a PipeReader running in the same + process (requires non-blocking write). + """ + + input_fd, writer_pipe = os.pipe() + _set_nonblocking(writer_pipe) + writer_pipe = os.fdopen(writer_pipe, 'wb', 0) + writer = asyncio.ensure_future(_writer(writer_pipe, test_string.encode('ascii'), loop=loop), loop=loop) + writer.add_done_callback(lambda writer: writer_pipe.close()) + + pr, pw = os.pipe() + + consumer = PipeLogger(background=True, + input_fd=input_fd, + log_file_path=os.fdopen(pw, 'wb', 0), + scheduler=loop) + consumer.start() + + # Before starting the reader, wait here for a moment, in order + # to exercise PipeLogger's handling of EAGAIN during write. + yield asyncio.wait([writer], timeout=0.01) + + reader = _reader(pr, loop=loop) + yield writer + content = yield reader + yield consumer.async_wait() + + self.assertEqual(consumer.returncode, os.EX_OK) + + coroutine_return(content.decode('ascii', 'replace')) + + def testPipeLogger(self): + loop = asyncio._wrap_loop() + + for x in (1, 2, 5, 6, 7, 8, 2**5, 2**10, 2**12, 2**13, 2**14, 2**17, 2**17 + 1): + test_string = x * "a" + output = loop.run_until_complete(self._testPipeLoggerToPipe(test_string, loop=loop)) + self.assertEqual(test_string, output, + "x = %s, len(output) = %s" % (x, len(output))) diff --git a/lib/portage/util/_async/PipeLogger.py b/lib/portage/util/_async/PipeLogger.py index a4258f350..ce8afb846 100644 --- a/lib/portage/util/_async/PipeLogger.py +++ b/lib/portage/util/_async/PipeLogger.py @@ -8,6 +8,10 @@ import sys import portage from portage import os, _encodings, _unicode_encode +from portage.util.futures import asyncio +from portage.util.futures._asyncio.streams import _writer +from portage.util.futures.compat_coroutine import coroutine +from portage.util.futures.unix_events import _set_nonblocking from _emerge.AbstractPollTask import AbstractPollTask class PipeLogger(AbstractPollTask): @@ -21,13 +25,16 @@ class PipeLogger(AbstractPollTask): """ __slots__ = ("input_fd", "log_file_path", "stdout_fd") + \ - ("_log_file", "_log_file_real") + ("_io_loop_task", "_log_file", "_log_file_nb", "_log_file_real") def _start(self): log_file_path = self.log_file_path - if log_file_path is not None: - + if hasattr(log_file_path, 'write'): + self._log_file_nb = True + self._log_file = log_file_path + _set_nonblocking(self._log_file.fileno()) + elif log_file_path is not None: self._log_file = open(_unicode_encode(log_file_path, encoding=_encodings['fs'], errors='strict'), mode='ab') if log_file_path.endswith('.gz'): @@ -40,9 +47,9 @@ class PipeLogger(AbstractPollTask): mode=0o660) if