Re: [gentoo-portage-dev] [PATCH 1/2] PipeLogger: non-blocking write to pipe (bug 709746)

2020-06-22 Thread Brian Dolbec
On Fri, 19 Jun 2020 13:39:18 -0700
Zac Medico  wrote:

> Add support to write to a non-blocking pipe instead of a
> log file. This is needed for the purposes of bug 709746,
> where PipeLogger will write to a pipe that is drained
> by anoher PipeLogger instance which is running in the same
> process.
> 
> Bug: https://bugs.gentoo.org/709746
> Signed-off-by: Zac Medico 
> ---
>  lib/portage/tests/process/test_PipeLogger.py | 58 
>  lib/portage/util/_async/PipeLogger.py| 73
> +++- 2 files changed, 115 insertions(+), 16
> deletions(-) create mode 100644
> lib/portage/tests/process/test_PipeLogger.py
> 
> diff --git a/lib/portage/tests/process/test_PipeLogger.py
> b/lib/portage/tests/process/test_PipeLogger.py new file mode 100644
> index 0..2bd94cf39
> --- /dev/null
> +++ b/lib/portage/tests/process/test_PipeLogger.py
> @@ -0,0 +1,58 @@
> +# Copyright 2020 Gentoo Authors
> +# Distributed under the terms of the GNU General Public License v2
> +
> +from portage import os
> +from portage.tests import TestCase
> +from portage.util._async.PipeLogger import PipeLogger
> +from portage.util.futures import asyncio
> +from portage.util.futures._asyncio.streams import _reader, _writer
> +from portage.util.futures.compat_coroutine import coroutine,
> coroutine_return +from portage.util.futures.unix_events import
> _set_nonblocking +
> +
> +class PipeLoggerTestCase(TestCase):
> +
> + @coroutine
> + def _testPipeLoggerToPipe(self, test_string, loop=None):
> + """
> + Test PipeLogger writing to a pipe connected to a
> PipeReader.
> + This verifies that PipeLogger does not deadlock when
> writing
> + to a pipe that's drained by a PipeReader running in
> the same
> + process (requires non-blocking write).
> + """
> +
> + input_fd, writer_pipe = os.pipe()
> + _set_nonblocking(writer_pipe)
> + writer_pipe = os.fdopen(writer_pipe, 'wb', 0)
> + writer = asyncio.ensure_future(_writer(writer_pipe,
> test_string.encode('ascii'), loop=loop), loop=loop)
> + writer.add_done_callback(lambda writer:
> writer_pipe.close()) +
> + pr, pw = os.pipe()
> +
> + consumer = PipeLogger(background=True,
> + input_fd=input_fd,
> + log_file_path=os.fdopen(pw, 'wb', 0),
> + scheduler=loop)
> + consumer.start()
> +
> + # Before starting the reader, wait here for a
> moment, in order
> + # to exercise PipeLogger's handling of EAGAIN during
> write.
> + yield asyncio.wait([writer], timeout=0.01)
> +
> + reader = _reader(pr, loop=loop)
> + yield writer
> + content = yield reader
> + yield consumer.async_wait()
> +
> + self.assertEqual(consumer.returncode, os.EX_OK)
> +
> + coroutine_return(content.decode('ascii', 'replace'))
> +
> + def testPipeLogger(self):
> + loop = asyncio._wrap_loop()
> +
> + for x in (1, 2, 5, 6, 7, 8, 2**5, 2**10, 2**12,
> 2**13, 2**14, 2**17, 2**17 + 1):
> + test_string = x * "a"
> + output =
> loop.run_until_complete(self._testPipeLoggerToPipe(test_string,
> loop=loop))
> + self.assertEqual(test_string, output,
> + "x = %s, len(output) = %s" % (x,
> len(output))) diff --git a/lib/portage/util/_async/PipeLogger.py
> b/lib/portage/util/_async/PipeLogger.py index a4258f350..ce8afb846
> 100644 --- a/lib/portage/util/_async/PipeLogger.py
> +++ b/lib/portage/util/_async/PipeLogger.py
> @@ -8,6 +8,10 @@ import sys
>  
>  import portage
>  from portage import os, _encodings, _unicode_encode
> +from portage.util.futures import asyncio
> +from portage.util.futures._asyncio.streams import _writer
> +from portage.util.futures.compat_coroutine import coroutine
> +from portage.util.futures.unix_events import _set_nonblocking
>  from _emerge.AbstractPollTask import AbstractPollTask
>  
>  class PipeLogger(AbstractPollTask):
> @@ -21,13 +25,16 @@ class PipeLogger(AbstractPollTask):
>   """
>  
>   __slots__ = ("input_fd", "log_file_path", "stdout_fd") + \
> - ("_log_file", "_log_file_real")
> + ("_io_loop_task", "_log_file", "_log_file_nb",
> "_log_file_real") 
>   def _start(self):
>  
>   log_file_path = self.log_file_path
> - if log_file_path is not None:
> -
> + if hasattr(log_file_path, 'write'):
> + self._log_file_nb = True
> + self._log_file = log_file_path
> + _set_nonblocking(self._log_file.fileno())
> + elif log_file_path is not None:
>   self._log_file =
> open(_unicode_encode(log_file_path, encoding=_encodings['fs'],
> errors='strict'), mode='ab') if 

[gentoo-portage-dev] [PATCH 1/2] PipeLogger: non-blocking write to pipe (bug 709746)

2020-06-19 Thread Zac Medico
Add support to write to a non-blocking pipe instead of a
log file. This is needed for the purposes of bug 709746,
where PipeLogger will write to a pipe that is drained
by anoher PipeLogger instance which is running in the same
process.

Bug: https://bugs.gentoo.org/709746
Signed-off-by: Zac Medico 
---
 lib/portage/tests/process/test_PipeLogger.py | 58 
 lib/portage/util/_async/PipeLogger.py| 73 +++-
 2 files changed, 115 insertions(+), 16 deletions(-)
 create mode 100644 lib/portage/tests/process/test_PipeLogger.py

diff --git a/lib/portage/tests/process/test_PipeLogger.py 
b/lib/portage/tests/process/test_PipeLogger.py
new file mode 100644
index 0..2bd94cf39
--- /dev/null
+++ b/lib/portage/tests/process/test_PipeLogger.py
@@ -0,0 +1,58 @@
+# Copyright 2020 Gentoo Authors
+# Distributed under the terms of the GNU General Public License v2
+
+from portage import os
+from portage.tests import TestCase
+from portage.util._async.PipeLogger import PipeLogger
+from portage.util.futures import asyncio
+from portage.util.futures._asyncio.streams import _reader, _writer
+from portage.util.futures.compat_coroutine import coroutine, coroutine_return
+from portage.util.futures.unix_events import _set_nonblocking
+
+
+class PipeLoggerTestCase(TestCase):
+
+   @coroutine
+   def _testPipeLoggerToPipe(self, test_string, loop=None):
+   """
+   Test PipeLogger writing to a pipe connected to a PipeReader.
+   This verifies that PipeLogger does not deadlock when writing
+   to a pipe that's drained by a PipeReader running in the same
+   process (requires non-blocking write).
+   """
+
+   input_fd, writer_pipe = os.pipe()
+   _set_nonblocking(writer_pipe)
+   writer_pipe = os.fdopen(writer_pipe, 'wb', 0)
+   writer = asyncio.ensure_future(_writer(writer_pipe, 
test_string.encode('ascii'), loop=loop), loop=loop)
+   writer.add_done_callback(lambda writer: writer_pipe.close())
+
+   pr, pw = os.pipe()
+
+   consumer = PipeLogger(background=True,
+   input_fd=input_fd,
+   log_file_path=os.fdopen(pw, 'wb', 0),
+   scheduler=loop)
+   consumer.start()
+
+   # Before starting the reader, wait here for a moment, in order
+   # to exercise PipeLogger's handling of EAGAIN during write.
+   yield asyncio.wait([writer], timeout=0.01)
+
+   reader = _reader(pr, loop=loop)
+   yield writer
+   content = yield reader
+   yield consumer.async_wait()
+
+   self.assertEqual(consumer.returncode, os.EX_OK)
+
+   coroutine_return(content.decode('ascii', 'replace'))
+
+   def testPipeLogger(self):
+   loop = asyncio._wrap_loop()
+
+   for x in (1, 2, 5, 6, 7, 8, 2**5, 2**10, 2**12, 2**13, 2**14, 
2**17, 2**17 + 1):
+   test_string = x * "a"
+   output = 
loop.run_until_complete(self._testPipeLoggerToPipe(test_string, loop=loop))
+   self.assertEqual(test_string, output,
+   "x = %s, len(output) = %s" % (x, len(output)))
diff --git a/lib/portage/util/_async/PipeLogger.py 
b/lib/portage/util/_async/PipeLogger.py
index a4258f350..ce8afb846 100644
--- a/lib/portage/util/_async/PipeLogger.py
+++ b/lib/portage/util/_async/PipeLogger.py
@@ -8,6 +8,10 @@ import sys
 
 import portage
 from portage import os, _encodings, _unicode_encode
+from portage.util.futures import asyncio
+from portage.util.futures._asyncio.streams import _writer
+from portage.util.futures.compat_coroutine import coroutine
+from portage.util.futures.unix_events import _set_nonblocking
 from _emerge.AbstractPollTask import AbstractPollTask
 
 class PipeLogger(AbstractPollTask):
@@ -21,13 +25,16 @@ class PipeLogger(AbstractPollTask):
"""
 
__slots__ = ("input_fd", "log_file_path", "stdout_fd") + \
-   ("_log_file", "_log_file_real")
+   ("_io_loop_task", "_log_file", "_log_file_nb", "_log_file_real")
 
def _start(self):
 
log_file_path = self.log_file_path
-   if log_file_path is not None:
-
+   if hasattr(log_file_path, 'write'):
+   self._log_file_nb = True
+   self._log_file = log_file_path
+   _set_nonblocking(self._log_file.fileno())
+   elif log_file_path is not None:
self._log_file = open(_unicode_encode(log_file_path,
encoding=_encodings['fs'], errors='strict'), 
mode='ab')
if log_file_path.endswith('.gz'):
@@ -40,9 +47,9 @@ class PipeLogger(AbstractPollTask):
mode=0o660)
 
if