Diff
Modified: trunk/Tools/ChangeLog (266189 => 266190)
--- trunk/Tools/ChangeLog 2020-08-26 20:42:28 UTC (rev 266189)
+++ trunk/Tools/ChangeLog 2020-08-26 21:01:19 UTC (rev 266190)
@@ -1,3 +1,37 @@
+2020-08-26 Jonathan Bedard <[email protected]>
+
+ [webkitcorepy] Standard Popen mocking API
+ https://bugs.webkit.org/show_bug.cgi?id=215712
+ <rdar://problem/67501911>
+
+ Reviewed by Dewei Zhu.
+
+ * Scripts/libraries/webkitcorepy/README.md: Add mocks.Subprocess documentation.
+ * Scripts/libraries/webkitcorepy/webkitcorepy/__init__.py: Bump version.
+ * Scripts/libraries/webkitcorepy/webkitcorepy/mocks/__init__.py: Export Subprocess, ProcessCompletion.
+ * Scripts/libraries/webkitcorepy/webkitcorepy/mocks/popen.py: Added.
+ (Popen): Mock Popen object (for both Python 2 and 3). This object extracts a ProcessCompletion object from the
+ mocked Subprocess stack and behaives according to the content of that object.
+ * Scripts/libraries/webkitcorepy/webkitcorepy/mocks/subprocess.py: Added.
+ (PopenBase): Base class to share code between Python 2 and 3 constructor.
+ (Popen): Mock Popen object (for both Python 2 and 3). This object extracts a ProcessCompletion object from the
+ mocked Subprocess stack and behaives according to the content of that object.
+ * Scripts/libraries/webkitcorepy/webkitcorepy/mocks/subprocess.py: Added.
+ (ProcessCompletion): Class which captures the result of a mock Popen call.
+ (Subprocess):
+ (Subprocess.CommandRoute): Object which routes a set of subprocess arguments to a specific ProcessCompletion or callback which
+ generates a ProcessCompletion.
+ (Subprocess.completion_generator_for): Given a file, search through the mocked Subprocess stack to find the
+ first completion for the provided file.
+ (Subprocess.completion_for): Extract file name from arguments, find the completion generator, and call
+ it with the provieded arguments, working directory and stdin.
+ (Subprocess.__init__):
+ * Scripts/libraries/webkitcorepy/webkitcorepy/tests/mocks/subprocess_unittest.py: Added.
+ (MockSubprocess):
+ (MockCheckOutput):
+ (MockCheckCall):
+ (MockRun):
+
2020-08-26 Eric Carlson <[email protected]>
TestWebKitAPI.AudioRoutingArbitration.Deletion is a constant failure
Modified: trunk/Tools/Scripts/libraries/webkitcorepy/README.md (266189 => 266190)
--- trunk/Tools/Scripts/libraries/webkitcorepy/README.md 2020-08-26 20:42:28 UTC (rev 266189)
+++ trunk/Tools/Scripts/libraries/webkitcorepy/README.md 2020-08-26 21:01:19 UTC (rev 266190)
@@ -76,3 +76,35 @@
result = run([sys.executable, '-c', 'print("message")'], capture_output=True, encoding='utf-8')
```
+
+Mocking of subprocess commands:
+```
+from webkitcorepy import mocks, run
+
+with mocks.Subprocess(
+ 'ls', completion=mocks.ProcessCompletion(returncode=0, stdout='file1.txt\nfile2.txt\n'),
+):
+ result = run(['ls'], capture_output=True, encoding='utf-8')
+ assert result.returncode == 0
+ assert result.stdout == 'file1.txt\nfile2.txt\n'
+```
+The mocking system for subprocess also supports other subprocess APIs based on Popen:
+```
+with mocks.Subprocess(
+ 'ls', completion=mocks.ProcessCompletion(returncode=0, stdout='file1.txt\nfile2.txt\n'),
+):
+ assert subprocess.check_output(['ls']) == b'file1.txt\nfile2.txt\n'
+ assert subprocess.check_call(['ls']) == 0
+```
+For writing integration tests, the mocking system for subprocess supports mocking multiple process calls at the same time:
+```
+with mocks.Subprocess(
+ mocks.Subprocess.CommandRoute('command-a', 'argument', completion=mocks.ProcessCompletion(returncode=0)),
+ mocks.Subprocess.CommandRoute('command-b', completion=mocks.ProcessCompletion(returncode=-1)),
+):
+ result = run(['command-a', 'argument'])
+ assert result.returncode == 0
+
+ result = run(['command-b'])
+ assert result.returncode == -1
+```
Modified: trunk/Tools/Scripts/libraries/webkitcorepy/webkitcorepy/__init__.py (266189 => 266190)
--- trunk/Tools/Scripts/libraries/webkitcorepy/webkitcorepy/__init__.py 2020-08-26 20:42:28 UTC (rev 266189)
+++ trunk/Tools/Scripts/libraries/webkitcorepy/webkitcorepy/__init__.py 2020-08-26 21:01:19 UTC (rev 266190)
@@ -35,7 +35,7 @@
from webkitcorepy.subprocess_utils import TimeoutExpired, CompletedProcess, run
from webkitcorepy.output_capture import LoggerCapture, OutputCapture, OutputDuplicate
-version = Version(0, 4, 0)
+version = Version(0, 4, 1)
from webkitcorepy.autoinstall import Package, AutoInstall
if sys.version_info > (3, 0):
Modified: trunk/Tools/Scripts/libraries/webkitcorepy/webkitcorepy/mocks/__init__.py (266189 => 266190)
--- trunk/Tools/Scripts/libraries/webkitcorepy/webkitcorepy/mocks/__init__.py 2020-08-26 20:42:28 UTC (rev 266189)
+++ trunk/Tools/Scripts/libraries/webkitcorepy/webkitcorepy/mocks/__init__.py 2020-08-26 21:01:19 UTC (rev 266190)
@@ -22,3 +22,4 @@
from webkitcorepy.mocks.context_stack import ContextStack
from webkitcorepy.mocks.time_ import Time
+from webkitcorepy.mocks.subprocess import ProcessCompletion, Subprocess
Added: trunk/Tools/Scripts/libraries/webkitcorepy/webkitcorepy/mocks/popen.py (0 => 266190)
--- trunk/Tools/Scripts/libraries/webkitcorepy/webkitcorepy/mocks/popen.py (rev 0)
+++ trunk/Tools/Scripts/libraries/webkitcorepy/webkitcorepy/mocks/popen.py 2020-08-26 21:01:19 UTC (rev 266190)
@@ -0,0 +1,241 @@
+# Copyright (C) 2020 Apple Inc. All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions
+# are met:
+# 1. Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# 2. Redistributions in binary form must reproduce the above copyright
+# notice, this list of conditions and the following disclaimer in the
+# documentation and/or other materials provided with the distribution.
+#
+# THIS SOFTWARE IS PROVIDED BY APPLE INC. AND ITS CONTRIBUTORS ``AS IS'' AND ANY
+# EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+# DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR ITS CONTRIBUTORS BE LIABLE FOR ANY
+# DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+# (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
+# ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+import os
+import io
+import subprocess
+import signal
+import sys
+import time
+
+from webkitcorepy import log, string_utils, Timeout, TimeoutExpired, unicode
+from webkitcorepy.mocks import Subprocess
+
+# This file is mocked version of the subprocess.Popen object. This object differs slightly between Python 2 and 3.
+# This object is not a complete mock of subprocess.Popen, but it does enable thorough testing of code which uses Popen,
+# in particular, the subprocess.check_call, subprocess.check_output and subprocess.run commands.
+#
+# If you find yourself needing to edit this file, lean heavily on the testing in
+# webkitcorepy.tests.mocks.subprocess_unittest and Python's implementation of Popen.
+
+
+class PopenBase(object):
+ NEXT_PID = os.getpid() + 1
+
+ def __init__(self, args, bufsize=None, cwd=None, stdin=None, stdout=None, stderr=None):
+ self._completion = None
+ self._communication_started = False
+ if bufsize is None:
+ bufsize = -1
+ if not isinstance(bufsize, int):
+ raise TypeError("bufsize must be an integer")
+
+ self._args = args
+ self._cwd = cwd
+
+ self.returncode = None
+
+ self.stdin = string_utils.BytesIO() if stdin is None or stdin == -1 else stdin
+ self.stdout = string_utils.BytesIO() if stdout == -1 else stdout
+ self._stdout_type = bytes if stdout == -1 else str
+ self.stderr = string_utils.BytesIO() if stderr == -1 else stderr
+ self._stderr_type = bytes if stderr == -1 else str
+
+ Subprocess.completion_generator_for(args[0])
+
+ self.pid = self.NEXT_PID
+ self.NEXT_PID += 1
+
+ self._start_time = time.time()
+
+ @property
+ def universal_newlines(self):
+ return self.text_mode
+
+ @universal_newlines.setter
+ def universal_newlines(self, universal_newlines):
+ self.text_mode = bool(universal_newlines)
+
+ def poll(self):
+ if not self._completion:
+ self.stdin.seek(0)
+ self._completion = Subprocess.completion_for(*self._args, cwd=self._cwd, input=self.stdin.read())
+
+ (self.stdout or sys.stdout).write(
+ string_utils.decode(self._completion.stdout, target_type=self._stdout_type))
+ (self.stdout or sys.stdout).flush()
+
+ (self.stderr or sys.stderr).write(
+ string_utils.decode(self._completion.stderr, target_type=self._stderr_type))
+ (self.stderr or sys.stderr).flush()
+
+ if self.returncode is not None and time.time() >= self._start_time + self._completion.elapsed:
+ self.returncode = self._completion.returncode
+ if self.stdout:
+ self.stdout.seek(0)
+ if self.stderr:
+ self.stderr.seek(0)
+
+ return self.returncode
+
+ def send_signal(self, sig):
+ if self.returncode is not None:
+ return
+
+ if sig not in [signal.SIGTERM, signal.SIGKILL]:
+ raise ValueError('Mock Popen object cannot handle signal {}'.format(sig))
+ log.critical('Mock process {} send signal {}'.format(self.pid, sig))
+ self.returncode = -1
+
+ def terminate(self):
+ self.send_signal(signal.SIGTERM)
+
+ def kill(self):
+ self.send_signal(signal.SIGKILL)
+
+
+if sys.version_info > (3, 0):
+ class Popen(PopenBase):
+ def __init__(self, args, bufsize=None, executable=None,
+ stdin=None, stdout=None, stderr=None,
+ preexec_fn=None, close_fds=True,
+ shell=False, cwd=None, env=None, universal_newlines=None,
+ startupinfo=None, creationflags=0,
+ restore_signals=True, start_new_session=False,
+ pass_fds=(), encoding=None, errors=None, text=None):
+
+ super(Popen, self).__init__(args, bufsize=bufsize, cwd=cwd, stdin=stdin, stdout=stdout, stderr=stderr)
+
+ if pass_fds and not close_fds:
+ log.warn("pass_fds overriding close_fds.")
+
+ for arg in args:
+ if not isinstance(arg, (str, bytes, os.PathLike)):
+ raise TypeError(
+ 'expected {}, {} or os.PathLike object, not {}',
+ str, bytes, type(arg),
+ )
+
+ self.args = args
+ self.encoding = encoding
+ self.errors = errors
+
+ if (text is not None and universal_newlines is not None and bool(universal_newlines) != bool(text)):
+ raise subprocess.SubprocessError('Cannot disambiguate when both text and universal_newlines are supplied but different. Pass one or the other.')
+
+ self.text_mode = encoding or errors or text or universal_newlines
+
+ if self.stdin is not None and self.text_mode:
+ self.stdin = io.TextIOWrapper(self.stdin, write_through=True, line_buffering=(bufsize == 1), encoding=encoding, errors=errors)
+ if self.stdout is not None and self.text_mode:
+ self.stdout = io.TextIOWrapper(self.stdout, encoding=encoding, errors=errors)
+ self._stdout_type = str
+ if self.stderr is not None and self.text_mode:
+ self.stderr = io.TextIOWrapper(self.stderr, encoding=encoding, errors=errors)
+ self._stderr_type = str
+
+ def communicate(self, input=None, timeout=None):
+ if self._communication_started and input:
+ raise ValueError('Cannot send input after starting communication')
+
+ self._communication_started = True
+ if input:
+ self.stdin.write(string_utils.encode(input))
+ self.wait(timeout=timeout)
+ return self.stdout.read() if self.stdout else None, self.stderr.read() if self.stderr else None
+
+ def wait(self, timeout=None):
+ if self.poll() is not None:
+ return
+
+ if timeout and (self._completion.elapsed is None or timeout < self._completion.elapsed):
+ raise TimeoutExpired(self._args, timeout)
+
+ if self._completion.elapsed is None:
+ raise ValueError('Running a command that hangs without a timeout')
+
+ if self._completion.elapsed:
+ time.sleep(self._completion.elapsed)
+
+ self.returncode = self._completion.returncode
+ if self.stdout:
+ self.stdout.seek(0)
+ if self.stderr:
+ self.stderr.seek(0)
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, exc_type, exc_value, traceback):
+ if self.stdout:
+ self.stdout.close()
+ if self.stderr:
+ self.stderr.close()
+ if self.stdin:
+ self.stdin.close()
+
+else:
+ class Popen(PopenBase):
+ def __init__(self, args, bufsize=-1, executable=None,
+ stdin=None, stdout=None, stderr=None,
+ preexec_fn=None, close_fds=True,
+ shell=False, cwd=None, env=None, universal_newlines=None,
+ startupinfo=None, creationflags=0):
+
+ super(Popen, self).__init__(args, bufsize=bufsize, cwd=cwd, stdin=stdin, stdout=stdout, stderr=stderr)
+
+ for index in range(len(args)):
+ if not isinstance(args[index], (str, unicode)):
+ raise TypeError('execv() arg {} must contain only strings'.format(index + 1))
+
+ self.text_mode = universal_newlines
+
+ def communicate(self, input=None):
+ if self._communication_started and input:
+ raise ValueError('Cannot send input after starting communication')
+
+ self._communication_started = True
+ if input:
+ self.stdin.write(input)
+ self.wait()
+ return self.stdout.read() if self.stdout else None, self.stderr.read() if self.stderr else None
+
+ def wait(self):
+ if self.poll() is not None:
+ return
+
+ # Need to check the timeout context
+ timeout = Timeout.difference()
+ if timeout and (self._completion.elapsed is None or timeout < self._completion.elapsed):
+ raise TimeoutExpired(self._args, timeout)
+
+ if self._completion.elapsed is None:
+ raise ValueError('Running a command that hangs without a timeout')
+
+ if self._completion.elapsed:
+ time.sleep(self._completion.elapsed)
+
+ self.returncode = self._completion.returncode
+ if self.stdout:
+ self.stdout.seek(0)
+ if self.stderr:
+ self.stderr.seek(0)
Added: trunk/Tools/Scripts/libraries/webkitcorepy/webkitcorepy/mocks/subprocess.py (0 => 266190)
--- trunk/Tools/Scripts/libraries/webkitcorepy/webkitcorepy/mocks/subprocess.py (rev 0)
+++ trunk/Tools/Scripts/libraries/webkitcorepy/webkitcorepy/mocks/subprocess.py 2020-08-26 21:01:19 UTC (rev 266190)
@@ -0,0 +1,187 @@
+# Copyright (C) 2020 Apple Inc. All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions
+# are met:
+# 1. Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# 2. Redistributions in binary form must reproduce the above copyright
+# notice, this list of conditions and the following disclaimer in the
+# documentation and/or other materials provided with the distribution.
+#
+# THIS SOFTWARE IS PROVIDED BY APPLE INC. AND ITS CONTRIBUTORS ``AS IS'' AND ANY
+# EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+# DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR ITS CONTRIBUTORS BE LIABLE FOR ANY
+# DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+# (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
+# ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+import re
+import sys
+
+from functools import cmp_to_key
+from webkitcorepy import string_utils, unicode
+from webkitcorepy.mocks import ContextStack
+
+
+class ProcessCompletion(object):
+ def __init__(self, returncode=None, stdout=None, stderr=None, elapsed=0):
+ self.returncode = 1 if returncode is None else returncode
+ self.stdout = string_utils.encode(stdout) if stdout else b''
+ self.stderr = string_utils.encode(stderr) if stderr else b''
+ self.elapsed = elapsed
+
+
+class Subprocess(ContextStack):
+ """
+ Organize ProcessCompletions so calls to subprocess functions will return a ProcessCompletion for
+ a set of arguments or trigger a ProcessCompletion generator. mocks.Subprocess makes an attempt to
+ prioritize CommandRoute objects for a given set of arguments such that the most specific applicable route
+ is prefered.
+
+ Example usage mocking a single command:
+ with mocks.Subprocess(
+ 'ls', completion=mocks.ProcessCompletion(returncode=0, stdout='file1.txt\nfile2.txt\n'),
+ ):
+ result = run(['ls'], capture_output=True, encoding='utf-8')
+ assert result.returncode == 0
+ assert result.stdout == 'file1.txt\nfile2.txt\n'
+
+ Example usage mocking a set of commands:
+ with mocks.Subprocess(
+ mocks.Subprocess.CommandRoute('command-a', 'argument', completion=mocks.ProcessCompletion(returncode=0)),
+ mocks.Subprocess.CommandRoute('command-b', completion=mocks.ProcessCompletion(returncode=-1)),
+ ):
+ result = run(['command-a', 'argument'])
+ assert result.returncode == 0
+
+ result = run(['command-b'])
+ assert result.returncode == -1
+ """
+ top = None
+
+ class CommandRoute(object):
+ def __init__(self, *args, **kwargs):
+ completion = kwargs.pop('completion', ProcessCompletion())
+ cwd = kwargs.pop('cwd', None)
+ input = kwargs.pop('input', None)
+ generator = kwargs.pop('generator', None)
+ if kwargs.keys():
+ raise TypeError('__init__() got an unexpected keyword argument {}'.format(kwargs.keys()[0]))
+
+ if isinstance(args, str) or isinstance(args, unicode):
+ self.args = [args]
+ elif not args:
+ raise ValueError('Arguments must be provided to a CommandRoute')
+ else:
+ self.args = args
+
+ self.generator = generator or (lambda *args, **kwargs: completion)
+ self.cwd = cwd
+ self.input = string_utils.encode(input) if input else None
+
+ def matches(self, *args, **kwargs):
+ cwd = kwargs.pop('cwd', None)
+ input = kwargs.pop('input', None)
+ if kwargs.keys():
+ raise TypeError('matches() got an unexpected keyword argument {}'.format(kwargs.keys()[0]))
+
+ if len(self.args) > len(args):
+ return False
+
+ for count in range(len(self.args)):
+ if self.args[count] is None:
+ return False
+
+ if self.args[count] == args[count]:
+ continue
+ elif hasattr(self.args[count], 'match') and self.args[count].match(args[count]):
+ continue
+ elif re.match(self.args[count], args[count]):
+ continue
+ return False
+
+ if self.cwd is not None and cwd != self.cwd:
+ return False
+ if self.input is not None and input != self.input:
+ return False
+ return True
+
+ def __call__(self, *args, **kwargs):
+ cwd = kwargs.pop('cwd', None)
+ input = kwargs.pop('input', None)
+ if kwargs.keys():
+ raise TypeError('__call__() got an unexpected keyword argument {}'.format(kwargs.keys()[0]))
+ return self.generator(*args, cwd=cwd, input=input)
+
+ @classmethod
+ def compare(cls, a, b):
+ for candidate in [
+ len(b.args) - len(a.args),
+ 0 if type(a.cwd) == type(b.cwd) else -1 if a.cwd else 1,
+ 0 if type(a.input) == type(b.input) else -1 if a.input else 1,
+ ]:
+ if candidate:
+ return candidate
+ return 0
+
+ Route = CommandRoute
+
+ @classmethod
+ def completion_generator_for(cls, program):
+ current = cls.top
+ candidates = []
+ while current:
+ for completion in current.completions:
+ if completion.args[0] == program:
+ candidates.append(completion)
+ if current.ordered:
+ break
+ current = current.previous
+
+ if candidates:
+ return candidates
+
+ if sys.version_info > (3, 0):
+ raise FileNotFoundError("No such file or directory: '{path}': '{path}'".format(path=program))
+ raise OSError('[Errno 2] No such file or directory')
+
+ @classmethod
+ def completion_for(cls, *args, **kwargs):
+ candidates = [
+ candidate for candidate in cls.completion_generator_for(args[0]) if candidate.matches(*args, **kwargs)
+ ]
+ if not candidates:
+ raise AssertionError('Provided arguments to {} do not match a provided completion'.format(args[0]))
+
+ completion = candidates[0]
+ current = cls.top
+ while current:
+ if current.ordered and completion is current.completions[0]:
+ current.completions.pop(0)
+ break
+ current = current.previous
+ return completion(*args, **kwargs)
+
+ def __init__(self, *args, **kwargs):
+ if all([isinstance(arg, self.CommandRoute) for arg in args]):
+ self.ordered = kwargs.pop('ordered', False)
+ if kwargs.keys():
+ raise TypeError('__init__() got an unexpected keyword argument {}'.format(kwargs.keys()[0]))
+ self.completions = list(args) if self.ordered else sorted(args, key=cmp_to_key(self.CommandRoute.compare))
+ elif any([isinstance(arg, self.CommandRoute) for arg in args]):
+ raise TypeError('mocks.Subprocess arguments must be of a consistent type')
+ else:
+ self.ordered = False
+ self.completions = [self.CommandRoute(*args, **kwargs)]
+
+ super(Subprocess, self).__init__(cls=Subprocess)
+
+ # Allow mock to be managed via autoinstall
+ from mock import patch
+ from webkitcorepy.mocks.popen import Popen
+ self.patches.append(patch('subprocess.Popen', new=Popen))
Added: trunk/Tools/Scripts/libraries/webkitcorepy/webkitcorepy/tests/mocks/subprocess_unittest.py (0 => 266190)
--- trunk/Tools/Scripts/libraries/webkitcorepy/webkitcorepy/tests/mocks/subprocess_unittest.py (rev 0)
+++ trunk/Tools/Scripts/libraries/webkitcorepy/webkitcorepy/tests/mocks/subprocess_unittest.py 2020-08-26 21:01:19 UTC (rev 266190)
@@ -0,0 +1,152 @@
+# Copyright (C) 2020 Apple Inc. All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions
+# are met:
+# 1. Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# 2. Redistributions in binary form must reproduce the above copyright
+# notice, this list of conditions and the following disclaimer in the
+# documentation and/or other materials provided with the distribution.
+#
+# THIS SOFTWARE IS PROVIDED BY APPLE INC. AND ITS CONTRIBUTORS ``AS IS'' AND ANY
+# EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+# DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR ITS CONTRIBUTORS BE LIABLE FOR ANY
+# DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+# (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
+# ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+import subprocess
+import unittest
+
+from webkitcorepy import BytesIO, OutputCapture, mocks, run
+
+
+class MockSubprocess(unittest.TestCase):
+ LS = mocks.Subprocess.Route(
+ 'ls',
+ completion=mocks.ProcessCompletion(returncode=0, stdout='file1.txt\nfile2.txt\n'),
+ )
+ SLEEP = mocks.Subprocess.Route(
+ 'sleep',
+ generator=lambda *args, **kwargs: mocks.ProcessCompletion(returncode=0, elapsed=int(args[1])),
+ )
+
+ def test_documentation(self):
+ with OutputCapture():
+ with mocks.Subprocess(
+ 'ls', completion=mocks.ProcessCompletion(returncode=0, stdout='file1.txt\nfile2.txt\n'),
+ ):
+ result = run(['ls'], capture_output=True, encoding='utf-8')
+ assert result.returncode == 0
+ assert result.stdout == 'file1.txt\nfile2.txt\n'
+
+ with mocks.Subprocess(
+ 'ls', completion=mocks.ProcessCompletion(returncode=0, stdout='file1.txt\nfile2.txt\n'),
+ ):
+ assert subprocess.check_output(['ls']) == b'file1.txt\nfile2.txt\n'
+ assert subprocess.check_call(['ls']) == 0
+
+ with mocks.Subprocess(
+ mocks.Subprocess.CommandRoute('command-a', 'argument', completion=mocks.ProcessCompletion(returncode=0)),
+ mocks.Subprocess.CommandRoute('command-b', completion=mocks.ProcessCompletion(returncode=-1)),
+ ):
+ result = run(['command-a', 'argument'])
+ assert result.returncode == 0
+
+ result = run(['command-b'])
+ assert result.returncode == -1
+
+ def test_no_file(self):
+ with mocks.Subprocess(self.LS, self.SLEEP):
+ with self.assertRaises(OSError):
+ run(['invalid-file'])
+
+ def test_implied_route(self):
+ with mocks.Subprocess('command', completion=mocks.ProcessCompletion(returncode=0)):
+ self.assertEqual(run(['command']).returncode, 0)
+
+ with self.assertRaises(OSError):
+ run(['invalid-file'])
+
+ def test_popen(self):
+ with mocks.Time:
+ with mocks.Subprocess(self.LS, self.SLEEP):
+ ls = subprocess.Popen(['ls'], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+ ls.wait()
+ self.assertEqual(0, ls.poll())
+ self.assertEqual(b'file1.txt\nfile2.txt\n', ls.stdout.read())
+
+ sleep = subprocess.Popen(['sleep', '1'], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+ self.assertEqual(None, sleep.poll())
+ sleep.wait()
+ self.assertEqual(0, sleep.poll())
+
+ def test_ordered(self):
+ with OutputCapture(), mocks.Subprocess(
+ mocks.Subprocess.Route('command', completion=mocks.ProcessCompletion(returncode=0)),
+ mocks.Subprocess.Route('command', completion=mocks.ProcessCompletion(returncode=1)),
+ ordered=True,
+ ):
+ self.assertEqual(run(['command']).returncode, 0)
+ self.assertEqual(run(['command']).returncode, 1)
+ with self.assertRaises(OSError):
+ run(['command'])
+
+ def test_argument_priority(self):
+ with OutputCapture(), mocks.Subprocess(
+ mocks.Subprocess.Route('command', '--help', completion=mocks.ProcessCompletion(returncode=0)),
+ mocks.Subprocess.Route('command', completion=mocks.ProcessCompletion(returncode=1)),
+ ):
+ self.assertEqual(run(['command']).returncode, 1)
+ self.assertEqual(run(['command', '--help']).returncode, 0)
+
+ def test_cwd_priority(self):
+ with OutputCapture(), mocks.Subprocess(
+ mocks.Subprocess.Route('command', completion=mocks.ProcessCompletion(returncode=0), cwd='/example'),
+ mocks.Subprocess.Route('command', completion=mocks.ProcessCompletion(returncode=1)),
+ ):
+ self.assertEqual(run(['command']).returncode, 1)
+ self.assertEqual(run(['command'], cwd='/example').returncode, 0)
+
+ def test_input_priority(self):
+ with OutputCapture(), mocks.Subprocess(
+ mocks.Subprocess.Route('command', completion=mocks.ProcessCompletion(returncode=0), input='stdin'),
+ mocks.Subprocess.Route('command', completion=mocks.ProcessCompletion(returncode=1)),
+ ):
+ self.assertEqual(run(['command']).returncode, 1)
+ self.assertEqual(run(['command'], input='stdin').returncode, 0)
+ self.assertEqual(run(['command'], stdin=BytesIO(b'stdin')).returncode, 0)
+
+
+class MockCheckOutput(unittest.TestCase):
+ def test_popen(self):
+ with mocks.Subprocess(MockSubprocess.LS):
+ result = subprocess.check_output(['ls'])
+ self.assertEqual(result, b'file1.txt\nfile2.txt\n')
+
+
+class MockCheckCall(unittest.TestCase):
+ def test_popen(self):
+ with OutputCapture() as captured:
+ with mocks.Subprocess(MockSubprocess.LS):
+ result = subprocess.check_call(['ls'])
+ self.assertEqual(result, 0)
+
+ self.assertEqual(captured.stdout.getvalue(), 'file1.txt\nfile2.txt\n')
+
+
+class MockRun(unittest.TestCase):
+ def test_popen(self):
+ with OutputCapture() as captured:
+ with mocks.Subprocess(MockSubprocess.LS):
+ result = run(['ls'])
+ self.assertEqual(result.returncode, 0)
+ self.assertEqual(result.stdout, None)
+ self.assertEqual(result.stderr, None)
+
+ self.assertEqual(captured.stdout.getvalue(), 'file1.txt\nfile2.txt\n')