Author: Ronny Pfannschmidt <[email protected]>
Branch: refine-testrunner
Changeset: r55880:67b927e13d58
Date: 2012-07-01 15:35 +0200
http://bitbucket.org/pypy/pypy/changeset/67b927e13d58/
Log: create a testrunner util module and move simple functions there
diff --git a/pytest.py b/pytest.py
--- a/pytest.py
+++ b/pytest.py
@@ -32,6 +32,9 @@
---> When pypy/__init__.py becomes empty again, we have reached stage 2.
""")
+import warnings
+warnings.filterwarnings('ignore', module='_pytest.core')
+
from _pytest.core import main, UsageError, _preloadplugins
from _pytest import core as cmdline
from _pytest import __version__
diff --git a/testrunner/runner.py b/testrunner/runner.py
--- a/testrunner/runner.py
+++ b/testrunner/runner.py
@@ -1,110 +1,15 @@
import sys, os, signal, thread, Queue, time
import py
-import subprocess, optparse
+import util
-if sys.platform == 'win32':
- PROCESS_TERMINATE = 0x1
- try:
- import win32api, pywintypes
- except ImportError:
- def _kill(pid, sig):
- import ctypes
- winapi = ctypes.windll.kernel32
- proch = winapi.OpenProcess(PROCESS_TERMINATE, 0, pid)
- winapi.TerminateProcess(proch, 1) == 1
- winapi.CloseHandle(proch)
- else:
- def _kill(pid, sig):
- try:
- proch = win32api.OpenProcess(PROCESS_TERMINATE, 0, pid)
- win32api.TerminateProcess(proch, 1)
- win32api.CloseHandle(proch)
- except pywintypes.error, e:
- pass
- #Try to avoid opeing a dialog box if one of the tests causes a system error
- import ctypes
- winapi = ctypes.windll.kernel32
- SetErrorMode = winapi.SetErrorMode
- SetErrorMode.argtypes=[ctypes.c_int]
- SEM_FAILCRITICALERRORS = 1
- SEM_NOGPFAULTERRORBOX = 2
- SEM_NOOPENFILEERRORBOX = 0x8000
- flags = SEM_FAILCRITICALERRORS | SEM_NOGPFAULTERRORBOX |
SEM_NOOPENFILEERRORBOX
- #Since there is no GetErrorMode, do a double Set
- old_mode = SetErrorMode(flags)
- SetErrorMode(old_mode | flags)
+import optparse
- SIGKILL = SIGTERM = 0
- READ_MODE = 'rU'
- WRITE_MODE = 'wb'
-else:
- def _kill(pid, sig):
- try:
- os.kill(pid, sig)
- except OSError:
- pass
+READ_MODE = 'rU'
+WRITE_MODE = 'wb'
- SIGKILL = signal.SIGKILL
- SIGTERM = signal.SIGTERM
- READ_MODE = 'r'
- WRITE_MODE = 'w'
-EXECUTEFAILED = -1001
-RUNFAILED = -1000
-TIMEDOUT = -999
-def busywait(p, timeout):
- t0 = time.time()
- delay = 0.5
- while True:
- time.sleep(delay)
- returncode = p.poll()
- if returncode is not None:
- return returncode
- tnow = time.time()
- if (tnow-t0) >= timeout:
- return None
- delay = min(delay * 1.15, 7.2)
-
-def run(args, cwd, out, timeout=None):
- f = out.open('w')
- try:
- try:
- p = subprocess.Popen(args, cwd=str(cwd), stdout=f, stderr=f)
- except Exception, e:
- f.write("Failed to run %s with cwd='%s' timeout=%s:\n"
- " %s\n"
- % (args, cwd, timeout, e))
- return RUNFAILED
-
- if timeout is None:
- return p.wait()
- else:
- returncode = busywait(p, timeout)
- if returncode is not None:
- return returncode
- # timeout!
- _kill(p.pid, SIGTERM)
- if busywait(p, 10) is None:
- _kill(p.pid, SIGKILL)
- return TIMEDOUT
- finally:
- f.close()
-
-def dry_run(args, cwd, out, timeout=None):
- f = out.open('w')
- try:
- f.write("run %s with cwd='%s' timeout=%s\n" % (args, cwd, timeout))
- finally:
- f.close()
- return 0
-
-def getsignalname(n):
- for name, value in signal.__dict__.items():
- if value == n and name.startswith('SIG'):
- return name
- return 'signal %d' % (n,)
def execute_test(cwd, test, out, logfname, interp, test_driver,
do_dry_run=False, timeout=None,
@@ -122,46 +27,14 @@
args[0] = os.path.join(str(cwd), interp0)
if do_dry_run:
- runfunc = dry_run
+ runfunc = util.dry_run
else:
- runfunc = run
+ runfunc = util.run
exitcode = runfunc(args, cwd, out, timeout=timeout)
return exitcode
-def should_report_failure(logdata):
- # When we have an exitcode of 1, it might be because of failures
- # that occurred "regularly", or because of another crash of py.test.
- # We decide heuristically based on logdata: if it looks like it
- # contains "F", "E" or "P" then it's a regular failure, otherwise
- # we have to report it.
- for line in logdata.splitlines():
- if (line.startswith('F ') or
- line.startswith('E ') or
- line.startswith('P ')):
- return False
- return True
-
-def interpret_exitcode(exitcode, test, logdata=""):
- extralog = ""
- if exitcode:
- failure = True
- if exitcode != 1 or should_report_failure(logdata):
- if exitcode > 0:
- msg = "Exit code %d." % exitcode
- elif exitcode == TIMEDOUT:
- msg = "TIMEOUT"
- elif exitcode == RUNFAILED:
- msg = "Failed to run interp"
- elif exitcode == EXECUTEFAILED:
- msg = "Failed with exception in execute-test"
- else:
- msg = "Killed by %s." % getsignalname(-exitcode)
- extralog = "! %s\n %s\n" % (test, msg)
- else:
- failure = False
- return failure, extralog
def worker(num, n, run_param, testdirs, result_queue):
sessdir = run_param.sessdir
@@ -195,7 +68,7 @@
print "execute-test for %r failed with:" % test
import traceback
traceback.print_exc()
- exitcode = EXECUTEFAILED
+ exitcode = util.EXECUTEFAILED
if one_output.check(file=1):
output = one_output.read(READ_MODE)
@@ -206,7 +79,7 @@
else:
logdata = ""
- failure, extralog = interpret_exitcode(exitcode, test, logdata)
+ failure, extralog = util.interpret_exitcode(exitcode, test, logdata)
if extralog:
logdata += extralog
diff --git a/testrunner/test/test_runner.py b/testrunner/test/test_runner.py
--- a/testrunner/test/test_runner.py
+++ b/testrunner/test/test_runner.py
@@ -1,115 +1,27 @@
import py, sys, os, signal, cStringIO, tempfile
import runner
+import util
import pypy
pytest_script = py.path.local(pypy.__file__).dirpath('test_all.py')
-def test_busywait():
- class FakeProcess:
- def poll(self):
- if timers[0] >= timers[1]:
- return 42
- return None
- class FakeTime:
- def sleep(self, delay):
- timers[0] += delay
- def time(self):
- timers[2] += 1
- return 12345678.9 + timers[0]
- p = FakeProcess()
- prevtime = runner.time
- try:
- runner.time = FakeTime()
- #
- timers = [0.0, 0.0, 0]
- returncode = runner.busywait(p, 10)
- assert returncode == 42 and 0.0 <= timers[0] <= 1.0
- #
- timers = [0.0, 3.0, 0]
- returncode = runner.busywait(p, 10)
- assert returncode == 42 and 3.0 <= timers[0] <= 5.0 and timers[2] <= 10
- #
- timers = [0.0, 500.0, 0]
- returncode = runner.busywait(p, 1000)
- assert returncode == 42 and 500.0<=timers[0]<=510.0 and timers[2]<=100
- #
- timers = [0.0, 500.0, 0]
- returncode = runner.busywait(p, 100) # get a timeout
- assert returncode == None and 100.0 <= timers[0] <= 110.0
- #
- finally:
- runner.time = prevtime
-
-def test_should_report_failure():
- should_report_failure = runner.should_report_failure
- assert should_report_failure("")
- assert should_report_failure(". Abc\n. Def\n")
- assert should_report_failure("s Ghi\n")
- assert not should_report_failure(". Abc\nF Def\n")
- assert not should_report_failure(". Abc\nE Def\n")
- assert not should_report_failure(". Abc\nP Def\n")
- assert not should_report_failure("F Def\n. Ghi\n. Jkl\n")
-
-
-
-class TestRunHelper(object):
- def pytest_funcarg__out(self, request):
- tmpdir = request.getfuncargvalue('tmpdir')
- return tmpdir.ensure('out')
-
- def test_run(self, out):
- res = runner.run([sys.executable, "-c", "print 42"], '.', out)
- assert res == 0
- assert out.read() == "42\n"
-
- def test_error(self, out):
- res = runner.run([sys.executable, "-c", "import sys; sys.exit(3)"],
'.', out)
- assert res == 3
-
- def test_signal(self, out):
- if sys.platform == 'win32':
- py.test.skip("no death by signal on windows")
- res = runner.run([sys.executable, "-c", "import os;
os.kill(os.getpid(), 9)"], '.', out)
- assert res == -9
-
- def test_timeout(self, out):
- res = runner.run([sys.executable, "-c", "while True: pass"], '.', out,
timeout=3)
- assert res == -999
-
- def test_timeout_lock(self, out):
- res = runner.run([sys.executable, "-c", "import threading;
l=threading.Lock(); l.acquire(); l.acquire()"], '.', out, timeout=3)
- assert res == -999
-
- def test_timeout_syscall(self, out):
- res = runner.run([sys.executable, "-c", "import socket; s=s =
socket.socket(socket.AF_INET, socket.SOCK_DGRAM); s.bind(('', 0));
s.recv(1000)"], '.', out, timeout=3)
- assert res == -999
-
- def test_timeout_success(self, out):
- res = runner.run([sys.executable, "-c", "print 42"], '.',
- out, timeout=2)
- assert res == 0
- out = out.read()
- assert out == "42\n"
class TestExecuteTest(object):
- def setup_class(cls):
- cls.real_run = (runner.run,)
- cls.called = []
- cls.exitcode = [0]
-
+ def pytest_funcarg__info(self, request):
+ monkeypatch = request.getfuncargvalue('monkeypatch')
+ info = {'exitcode' : 0}
def fake_run(args, cwd, out, timeout):
- cls.called = (args, cwd, out, timeout)
- return cls.exitcode[0]
- runner.run = fake_run
+ info['called'] = (args, cwd, out, timeout)
+ return info['exitcode']
+ monkeypatch.setattr(util, 'run', fake_run)
+ return info
- def teardown_class(cls):
- runner.run = cls.real_run[0]
- def test_explicit(self):
+ def test_explicit(self, info):
res = runner.execute_test('/wd', 'test_one', 'out', 'LOGFILE',
interp=['INTERP', 'IARG'],
test_driver=['driver', 'darg'],
@@ -123,10 +35,10 @@
'test_one']
- assert self.called == (expected, '/wd', 'out', 'secs')
+ assert info['called'] == (expected, '/wd', 'out', 'secs')
assert res == 0
- def test_explicit_win32(self):
+ def test_explicit_win32(self, info):
res = runner.execute_test('/wd', 'test_one', 'out', 'LOGFILE',
interp=['./INTERP', 'IARG'],
test_driver=['driver', 'darg'],
@@ -140,51 +52,24 @@
'--resultlog=LOGFILE',
'--junitxml=LOGFILE.junit',
'test_one']
- assert self.called[0] == expected
- assert self.called == (expected, '/wd', 'out', 'secs')
+ assert info['called'][0] == expected
+ assert info['called'] == (expected, '/wd', 'out', 'secs')
assert res == 0
- def test_error(self):
- self.exitcode[:] = [1]
+ def test_error(self, info):
+ info['exitcode'] = 1
res = runner.execute_test('/wd', 'test_one', 'out', 'LOGFILE',
interp=['INTERP', 'IARG'],
test_driver=['driver', 'darg'])
assert res == 1
- self.exitcode[:] = [-signal.SIGSEGV]
+ info['exitcode'] = -signal.SIGSEGV
res = runner.execute_test('/wd', 'test_one', 'out', 'LOGFILE',
interp=['INTERP', 'IARG'],
test_driver=['driver', 'darg'])
assert res == -signal.SIGSEGV
- def test_interpret_exitcode(self):
- failure, extralog = runner.interpret_exitcode(0, "test_foo")
- assert not failure
- assert extralog == ""
-
- failure, extralog = runner.interpret_exitcode(1, "test_foo", "")
- assert failure
- assert extralog == """! test_foo
- Exit code 1.
-"""
-
- failure, extralog = runner.interpret_exitcode(1, "test_foo", "F Foo\n")
- assert failure
- assert extralog == ""
-
- failure, extralog = runner.interpret_exitcode(2, "test_foo")
- assert failure
- assert extralog == """! test_foo
- Exit code 2.
-"""
-
- failure, extralog = runner.interpret_exitcode(-signal.SIGSEGV,
- "test_foo")
- assert failure
- assert extralog == """! test_foo
- Killed by SIGSEGV.
-"""
class RunnerTests(object):
with_thread = True
diff --git a/testrunner/test/test_util.py b/testrunner/test/test_util.py
new file mode 100644
--- /dev/null
+++ b/testrunner/test/test_util.py
@@ -0,0 +1,118 @@
+import util
+import runner
+
+def test_busywait():
+ class FakeProcess:
+ def poll(self):
+ if timers[0] >= timers[1]:
+ return 42
+ return None
+ class FakeTime:
+ def sleep(self, delay):
+ timers[0] += delay
+ def time(self):
+ timers[2] += 1
+ return 12345678.9 + timers[0]
+ p = FakeProcess()
+ prevtime = runner.time
+ try:
+ runner.time = FakeTime()
+ #
+ timers = [0.0, 0.0, 0]
+ returncode = util.busywait(p, 10)
+ assert returncode == 42 and 0.0 <= timers[0] <= 1.0
+ #
+ timers = [0.0, 3.0, 0]
+ returncode = util.busywait(p, 10)
+ assert returncode == 42 and 3.0 <= timers[0] <= 5.0 and timers[2] <= 10
+ #
+ timers = [0.0, 500.0, 0]
+ returncode = util.busywait(p, 1000)
+ assert returncode == 42 and 500.0<=timers[0]<=510.0 and timers[2]<=100
+ #
+ timers = [0.0, 500.0, 0]
+ returncode = util.busywait(p, 100) # get a timeout
+ assert returncode == None and 100.0 <= timers[0] <= 110.0
+ #
+ finally:
+ runner.time = prevtime
+
+def test_should_report_failure():
+ should_report_failure = util.should_report_failure
+ assert should_report_failure("")
+ assert should_report_failure(". Abc\n. Def\n")
+ assert should_report_failure("s Ghi\n")
+ assert not should_report_failure(". Abc\nF Def\n")
+ assert not should_report_failure(". Abc\nE Def\n")
+ assert not should_report_failure(". Abc\nP Def\n")
+ assert not should_report_failure("F Def\n. Ghi\n. Jkl\n")
+
+
+class TestRunHelper(object):
+ def pytest_funcarg__out(self, request):
+ tmpdir = request.getfuncargvalue('tmpdir')
+ return tmpdir.ensure('out')
+
+ def test_run(self, out):
+ res = util.run([sys.executable, "-c", "print 42"], '.', out)
+ assert res == 0
+ assert out.read() == "42\n"
+
+ def test_error(self, out):
+ res = util.run([sys.executable, "-c", "import sys; sys.exit(3)"], '.',
out)
+ assert res == 3
+
+ def test_signal(self, out):
+ if sys.platform == 'win32':
+ py.test.skip("no death by signal on windows")
+ res = util.run([sys.executable, "-c", "import os; os.kill(os.getpid(),
9)"], '.', out)
+ assert res == -9
+
+ def test_timeout(self, out):
+ res = util.run([sys.executable, "-c", "while True: pass"], '.', out,
timeout=3)
+ assert res == -999
+
+ def test_timeout_lock(self, out):
+ res = util.run([sys.executable, "-c", "import threading;
l=threading.Lock(); l.acquire(); l.acquire()"], '.', out, timeout=3)
+ assert res == -999
+
+ def test_timeout_syscall(self, out):
+ res = util.run([sys.executable, "-c", "import socket; s=s =
socket.socket(socket.AF_INET, socket.SOCK_DGRAM); s.bind(('', 0));
s.recv(1000)"], '.', out, timeout=3)
+ assert res == -999
+
+ def test_timeout_success(self, out):
+ res = util.run([sys.executable, "-c", "print 42"], '.',
+ out, timeout=2)
+ assert res == 0
+ out = out.read()
+ assert out == "42\n"
+
+
+
+def test_interpret_exitcode():
+ failure, extralog = util.interpret_exitcode(0, "test_foo")
+ assert not failure
+ assert extralog == ""
+
+ failure, extralog = runner.interpret_exitcode(1, "test_foo", "")
+ assert failure
+ assert extralog == """! test_foo
+Exit code 1.
+"""
+
+ failure, extralog = runner.interpret_exitcode(1, "test_foo", "F Foo\n")
+ assert failure
+ assert extralog == ""
+
+ failure, extralog = runner.interpret_exitcode(2, "test_foo")
+ assert failure
+ assert extralog == """! test_foo
+Exit code 2.
+"""
+
+ failure, extralog = runner.interpret_exitcode(-signal.SIGSEGV,
+ "test_foo")
+ assert failure
+ assert extralog == """! test_foo
+ Killed by SIGSEGV.
+"""
diff --git a/testrunner/util.py b/testrunner/util.py
new file mode 100644
--- /dev/null
+++ b/testrunner/util.py
@@ -0,0 +1,142 @@
+import sys
+import os
+import subprocess
+import signal
+import time
+
+if sys.platform == 'win32':
+ PROCESS_TERMINATE = 0x1
+ try:
+ import win32api, pywintypes
+ except ImportError:
+ def _kill(pid, sig):
+ import ctypes
+ winapi = ctypes.windll.kernel32
+ proch = winapi.OpenProcess(PROCESS_TERMINATE, 0, pid)
+ winapi.TerminateProcess(proch, 1) == 1
+ winapi.CloseHandle(proch)
+ else:
+ def _kill(pid, sig):
+ try:
+ proch = win32api.OpenProcess(PROCESS_TERMINATE, 0, pid)
+ win32api.TerminateProcess(proch, 1)
+ win32api.CloseHandle(proch)
+ except pywintypes.error:
+ pass
+ #Try to avoid opeing a dialog box if one of the tests causes a system error
+ import ctypes
+ winapi = ctypes.windll.kernel32
+ SetErrorMode = winapi.SetErrorMode
+ SetErrorMode.argtypes=[ctypes.c_int]
+
+ SEM_FAILCRITICALERRORS = 1
+ SEM_NOGPFAULTERRORBOX = 2
+ SEM_NOOPENFILEERRORBOX = 0x8000
+ flags = SEM_FAILCRITICALERRORS | SEM_NOGPFAULTERRORBOX |
SEM_NOOPENFILEERRORBOX
+ #Since there is no GetErrorMode, do a double Set
+ old_mode = SetErrorMode(flags)
+ SetErrorMode(old_mode | flags)
+
+ SIGKILL = SIGTERM = 0
+else:
+ def _kill(pid, sig):
+ try:
+ os.kill(pid, sig)
+ except OSError:
+ pass
+
+ SIGKILL = signal.SIGKILL
+ SIGTERM = signal.SIGTERM
+
+
+EXECUTEFAILED = -1001
+RUNFAILED = -1000
+TIMEDOUT = -999
+
+
+def getsignalname(n):
+ for name, value in signal.__dict__.items():
+ if value == n and name.startswith('SIG'):
+ return name
+ return 'signal %d' % (n,)
+
+
+def should_report_failure(logdata):
+ # When we have an exitcode of 1, it might be because of failures
+ # that occurred "regularly", or because of another crash of py.test.
+ # We decide heuristically based on logdata: if it looks like it
+ # contains "F", "E" or "P" then it's a regular failure, otherwise
+ # we have to report it.
+ for line in logdata.splitlines():
+ if (line.startswith('F ') or
+ line.startswith('E ') or
+ line.startswith('P ')):
+ return False
+ return True
+
+
+
+def busywait(p, timeout):
+ t0 = time.time()
+ delay = 0.5
+ while True:
+ time.sleep(delay)
+ returncode = p.poll()
+ if returncode is not None:
+ return returncode
+ tnow = time.time()
+ if (tnow-t0) >= timeout:
+ return None
+ delay = min(delay * 1.15, 7.2)
+
+
+
+def interpret_exitcode(exitcode, test, logdata=""):
+ extralog = ""
+ if exitcode:
+ failure = True
+ if exitcode != 1 or should_report_failure(logdata):
+ if exitcode > 0:
+ msg = "Exit code %d." % exitcode
+ elif exitcode == TIMEDOUT:
+ msg = "TIMEOUT"
+ elif exitcode == RUNFAILED:
+ msg = "Failed to run interp"
+ elif exitcode == EXECUTEFAILED:
+ msg = "Failed with exception in execute-test"
+ else:
+ msg = "Killed by %s." % getsignalname(-exitcode)
+ extralog = "! %s\n %s\n" % (test, msg)
+ else:
+ failure = False
+ return failure, extralog
+
+
+
+def run(args, cwd, out, timeout=None):
+ with out.open('w') as f:
+ try:
+ p = subprocess.Popen(args, cwd=str(cwd), stdout=f, stderr=f)
+ except Exception, e:
+ f.write("Failed to run %s with cwd='%s' timeout=%s:\n"
+ " %s\n"
+ % (args, cwd, timeout, e))
+ return RUNFAILED
+
+ if timeout is None:
+ return p.wait()
+ else:
+ returncode = busywait(p, timeout)
+ if returncode is not None:
+ return returncode
+ # timeout!
+ _kill(p.pid, SIGTERM)
+ if busywait(p, 10) is None:
+ _kill(p.pid, SIGKILL)
+ return TIMEDOUT
+
+
+def dry_run(args, cwd, out, timeout=None):
+ with out.open('w') as f:
+ f.write("run %s with cwd='%s' timeout=%s\n" % (args, cwd, timeout))
+ return 0
_______________________________________________
pypy-commit mailing list
[email protected]
http://mail.python.org/mailman/listinfo/pypy-commit