Author: Philip Jenvey <pjen...@underboss.org> Branch: py3k Changeset: r72756:87910b468690 Date: 2014-08-11 17:12 -0700 http://bitbucket.org/pypy/pypy/changeset/87910b468690/
Log: merge default diff too long, truncating to 2000 out of 10648 lines diff --git a/_pytest/__init__.py b/_pytest/__init__.py --- a/_pytest/__init__.py +++ b/_pytest/__init__.py @@ -1,2 +1,2 @@ # -__version__ = '2.2.4.dev2' +__version__ = '2.5.2' diff --git a/_pytest/_argcomplete.py b/_pytest/_argcomplete.py new file mode 100644 --- /dev/null +++ b/_pytest/_argcomplete.py @@ -0,0 +1,104 @@ + +"""allow bash-completion for argparse with argcomplete if installed +needs argcomplete>=0.5.6 for python 3.2/3.3 (older versions fail +to find the magic string, so _ARGCOMPLETE env. var is never set, and +this does not need special code. + +argcomplete does not support python 2.5 (although the changes for that +are minor). + +Function try_argcomplete(parser) should be called directly before +the call to ArgumentParser.parse_args(). + +The filescompleter is what you normally would use on the positional +arguments specification, in order to get "dirname/" after "dirn<TAB>" +instead of the default "dirname ": + + optparser.add_argument(Config._file_or_dir, nargs='*' + ).completer=filescompleter + +Other, application specific, completers should go in the file +doing the add_argument calls as they need to be specified as .completer +attributes as well. (If argcomplete is not installed, the function the +attribute points to will not be used). + +SPEEDUP +======= +The generic argcomplete script for bash-completion +(/etc/bash_completion.d/python-argcomplete.sh ) +uses a python program to determine startup script generated by pip. +You can speed up completion somewhat by changing this script to include + # PYTHON_ARGCOMPLETE_OK +so the the python-argcomplete-check-easy-install-script does not +need to be called to find the entry point of the code and see if that is +marked with PYTHON_ARGCOMPLETE_OK + +INSTALL/DEBUGGING +================= +To include this support in another application that has setup.py generated +scripts: +- add the line: + # PYTHON_ARGCOMPLETE_OK + near the top of the main python entry point +- include in the file calling parse_args(): + from _argcomplete import try_argcomplete, filescompleter + , call try_argcomplete just before parse_args(), and optionally add + filescompleter to the positional arguments' add_argument() +If things do not work right away: +- switch on argcomplete debugging with (also helpful when doing custom + completers): + export _ARC_DEBUG=1 +- run: + python-argcomplete-check-easy-install-script $(which appname) + echo $? + will echo 0 if the magic line has been found, 1 if not +- sometimes it helps to find early on errors using: + _ARGCOMPLETE=1 _ARC_DEBUG=1 appname + which should throw a KeyError: 'COMPLINE' (which is properly set by the + global argcomplete script). +""" + +import sys +import os +from glob import glob + +class FastFilesCompleter: + 'Fast file completer class' + def __init__(self, directories=True): + self.directories = directories + + def __call__(self, prefix, **kwargs): + """only called on non option completions""" + if os.path.sep in prefix[1:]: # + prefix_dir = len(os.path.dirname(prefix) + os.path.sep) + else: + prefix_dir = 0 + completion = [] + globbed = [] + if '*' not in prefix and '?' not in prefix: + if prefix[-1] == os.path.sep: # we are on unix, otherwise no bash + globbed.extend(glob(prefix + '.*')) + prefix += '*' + globbed.extend(glob(prefix)) + for x in sorted(globbed): + if os.path.isdir(x): + x += '/' + # append stripping the prefix (like bash, not like compgen) + completion.append(x[prefix_dir:]) + return completion + +if os.environ.get('_ARGCOMPLETE'): + # argcomplete 0.5.6 is not compatible with python 2.5.6: print/with/format + if sys.version_info[:2] < (2, 6): + sys.exit(1) + try: + import argcomplete.completers + except ImportError: + sys.exit(-1) + filescompleter = FastFilesCompleter() + + def try_argcomplete(parser): + argcomplete.autocomplete(parser) +else: + def try_argcomplete(parser): pass + filescompleter = None diff --git a/_pytest/assertion/__init__.py b/_pytest/assertion/__init__.py --- a/_pytest/assertion/__init__.py +++ b/_pytest/assertion/__init__.py @@ -3,7 +3,6 @@ """ import py import sys -import pytest from _pytest.monkeypatch import monkeypatch from _pytest.assertion import util @@ -19,8 +18,8 @@ to provide assert expression information. """) group.addoption('--no-assert', action="store_true", default=False, dest="noassert", help="DEPRECATED equivalent to --assert=plain") - group.addoption('--nomagic', action="store_true", default=False, - dest="nomagic", help="DEPRECATED equivalent to --assert=plain") + group.addoption('--nomagic', '--no-magic', action="store_true", + default=False, help="DEPRECATED equivalent to --assert=plain") class AssertionState: """State for the assertion plugin.""" @@ -35,22 +34,25 @@ mode = "plain" if mode == "rewrite": try: - import ast + import ast # noqa except ImportError: mode = "reinterp" else: - if sys.platform.startswith('java'): + # Both Jython and CPython 2.6.0 have AST bugs that make the + # assertion rewriting hook malfunction. + if (sys.platform.startswith('java') or + sys.version_info[:3] == (2, 6, 0)): mode = "reinterp" if mode != "plain": _load_modules(mode) m = monkeypatch() config._cleanup.append(m.undo) m.setattr(py.builtin.builtins, 'AssertionError', - reinterpret.AssertionError) + reinterpret.AssertionError) # noqa hook = None if mode == "rewrite": - hook = rewrite.AssertionRewritingHook() - sys.meta_path.append(hook) + hook = rewrite.AssertionRewritingHook() # noqa + sys.meta_path.insert(0, hook) warn_about_missing_assertion(mode) config._assertstate = AssertionState(config, mode) config._assertstate.hook = hook @@ -73,9 +75,16 @@ def callbinrepr(op, left, right): hook_result = item.ihook.pytest_assertrepr_compare( config=item.config, op=op, left=left, right=right) + for new_expl in hook_result: if new_expl: - res = '\n~'.join(new_expl) + # Don't include pageloads of data unless we are very + # verbose (-vv) + if (sum(len(p) for p in new_expl[1:]) > 80*8 + and item.config.option.verbose < 2): + new_expl[1:] = [py.builtin._totext( + 'Detailed information truncated, use "-vv" to show')] + res = py.builtin._totext('\n~').join(new_expl) if item.config.getvalue("assertmode") == "rewrite": # The result will be fed back a python % formatting # operation, which will fail if there are extraneous @@ -95,9 +104,9 @@ def _load_modules(mode): """Lazily import assertion related code.""" global rewrite, reinterpret - from _pytest.assertion import reinterpret + from _pytest.assertion import reinterpret # noqa if mode == "rewrite": - from _pytest.assertion import rewrite + from _pytest.assertion import rewrite # noqa def warn_about_missing_assertion(mode): try: diff --git a/_pytest/assertion/newinterpret.py b/_pytest/assertion/newinterpret.py --- a/_pytest/assertion/newinterpret.py +++ b/_pytest/assertion/newinterpret.py @@ -11,7 +11,7 @@ from _pytest.assertion.reinterpret import BuiltinAssertionError -if sys.platform.startswith("java") and sys.version_info < (2, 5, 2): +if sys.platform.startswith("java"): # See http://bugs.jython.org/issue1497 _exprs = ("BoolOp", "BinOp", "UnaryOp", "Lambda", "IfExp", "Dict", "ListComp", "GeneratorExp", "Yield", "Compare", "Call", diff --git a/_pytest/assertion/oldinterpret.py b/_pytest/assertion/oldinterpret.py --- a/_pytest/assertion/oldinterpret.py +++ b/_pytest/assertion/oldinterpret.py @@ -526,10 +526,13 @@ # example: def f(): return 5 + def g(): return 3 + def h(x): return 'never' + check("f() * g() == 5") check("not f()") check("not (f() and g() or 0)") diff --git a/_pytest/assertion/reinterpret.py b/_pytest/assertion/reinterpret.py --- a/_pytest/assertion/reinterpret.py +++ b/_pytest/assertion/reinterpret.py @@ -1,18 +1,26 @@ import sys import py from _pytest.assertion.util import BuiltinAssertionError +u = py.builtin._totext + class AssertionError(BuiltinAssertionError): def __init__(self, *args): BuiltinAssertionError.__init__(self, *args) if args: + # on Python2.6 we get len(args)==2 for: assert 0, (x,y) + # on Python2.7 and above we always get len(args) == 1 + # with args[0] being the (x,y) tuple. + if len(args) > 1: + toprint = args + else: + toprint = args[0] try: - self.msg = str(args[0]) - except py.builtin._sysex: - raise - except: - self.msg = "<[broken __repr__] %s at %0xd>" %( - args[0].__class__, id(args[0])) + self.msg = u(toprint) + except Exception: + self.msg = u( + "<[broken __repr__] %s at %0xd>" + % (toprint.__class__, id(toprint))) else: f = py.code.Frame(sys._getframe(1)) try: @@ -44,4 +52,3 @@ from _pytest.assertion.newinterpret import interpret as reinterpret else: reinterpret = reinterpret_old - diff --git a/_pytest/assertion/rewrite.py b/_pytest/assertion/rewrite.py --- a/_pytest/assertion/rewrite.py +++ b/_pytest/assertion/rewrite.py @@ -6,6 +6,7 @@ import imp import marshal import os +import re import struct import sys import types @@ -14,13 +15,7 @@ from _pytest.assertion import util -# Windows gives ENOENT in places *nix gives ENOTDIR. -if sys.platform.startswith("win"): - PATH_COMPONENT_NOT_DIR = errno.ENOENT -else: - PATH_COMPONENT_NOT_DIR = errno.ENOTDIR - -# py.test caches rewritten pycs in __pycache__. +# pytest caches rewritten pycs in __pycache__. if hasattr(imp, "get_tag"): PYTEST_TAG = imp.get_tag() + "-PYTEST" else: @@ -34,17 +29,19 @@ PYTEST_TAG = "%s-%s%s-PYTEST" % (impl, ver[0], ver[1]) del ver, impl -PYC_EXT = ".py" + "c" if __debug__ else "o" +PYC_EXT = ".py" + (__debug__ and "c" or "o") PYC_TAIL = "." + PYTEST_TAG + PYC_EXT REWRITE_NEWLINES = sys.version_info[:2] != (2, 7) and sys.version_info < (3, 2) +ASCII_IS_DEFAULT_ENCODING = sys.version_info[0] < 3 class AssertionRewritingHook(object): - """Import hook which rewrites asserts.""" + """PEP302 Import hook which rewrites asserts.""" def __init__(self): self.session = None self.modules = {} + self._register_with_pkg_resources() def set_session(self, session): self.fnpats = session.config.getini("python_files") @@ -59,8 +56,12 @@ names = name.rsplit(".", 1) lastname = names[-1] pth = None - if path is not None and len(path) == 1: - pth = path[0] + if path is not None: + # Starting with Python 3.3, path is a _NamespacePath(), which + # causes problems if not converted to list. + path = list(path) + if len(path) == 1: + pth = path[0] if pth is None: try: fd, fn, desc = imp.find_module(lastname, path) @@ -95,12 +96,13 @@ finally: self.session = sess else: - state.trace("matched test file (was specified on cmdline): %r" % (fn,)) + state.trace("matched test file (was specified on cmdline): %r" % + (fn,)) # The requested module looks like a test file, so rewrite it. This is # the most magical part of the process: load the source, rewrite the # asserts, and load the rewritten source. We also cache the rewritten # module code in a special pyc. We must be aware of the possibility of - # concurrent py.test processes rewriting and loading pycs. To avoid + # concurrent pytest processes rewriting and loading pycs. To avoid # tricky race conditions, we maintain the following invariant: The # cached pyc is always a complete, valid pyc. Operations on it must be # atomic. POSIX's atomic rename comes in handy. @@ -116,19 +118,19 @@ # common case) or it's blocked by a non-dir node. In the # latter case, we'll ignore it in _write_pyc. pass - elif e == PATH_COMPONENT_NOT_DIR: + elif e in [errno.ENOENT, errno.ENOTDIR]: # One of the path components was not a directory, likely # because we're in a zip file. write = False elif e == errno.EACCES: - state.trace("read only directory: %r" % (fn_pypath.dirname,)) + state.trace("read only directory: %r" % fn_pypath.dirname) write = False else: raise cache_name = fn_pypath.basename[:-3] + PYC_TAIL pyc = os.path.join(cache_dir, cache_name) - # Notice that even if we're in a read-only directory, I'm going to check - # for a cached pyc. This may not be optimal... + # Notice that even if we're in a read-only directory, I'm going + # to check for a cached pyc. This may not be optimal... co = _read_pyc(fn_pypath, pyc) if co is None: state.trace("rewriting %r" % (fn,)) @@ -153,27 +155,59 @@ mod.__file__ = co.co_filename # Normally, this attribute is 3.2+. mod.__cached__ = pyc + mod.__loader__ = self py.builtin.exec_(co, mod.__dict__) except: del sys.modules[name] raise return sys.modules[name] -def _write_pyc(co, source_path, pyc): - # Technically, we don't have to have the same pyc format as (C)Python, since - # these "pycs" should never be seen by builtin import. However, there's - # little reason deviate, and I hope sometime to be able to use - # imp.load_compiled to load them. (See the comment in load_module above.) + + + def is_package(self, name): + try: + fd, fn, desc = imp.find_module(name) + except ImportError: + return False + if fd is not None: + fd.close() + tp = desc[2] + return tp == imp.PKG_DIRECTORY + + @classmethod + def _register_with_pkg_resources(cls): + """ + Ensure package resources can be loaded from this loader. May be called + multiple times, as the operation is idempotent. + """ + try: + import pkg_resources + # access an attribute in case a deferred importer is present + pkg_resources.__name__ + except ImportError: + return + + # Since pytest tests are always located in the file system, the + # DefaultProvider is appropriate. + pkg_resources.register_loader_type(cls, pkg_resources.DefaultProvider) + + +def _write_pyc(state, co, source_path, pyc): + # Technically, we don't have to have the same pyc format as + # (C)Python, since these "pycs" should never be seen by builtin + # import. However, there's little reason deviate, and I hope + # sometime to be able to use imp.load_compiled to load them. (See + # the comment in load_module above.) mtime = int(source_path.mtime()) try: fp = open(pyc, "wb") except IOError: err = sys.exc_info()[1].errno - if err == PATH_COMPONENT_NOT_DIR: - # This happens when we get a EEXIST in find_module creating the - # __pycache__ directory and __pycache__ is by some non-dir node. - return False - raise + state.trace("error writing pyc file at %s: errno=%s" %(pyc, err)) + # we ignore any failure to write the cache file + # there are many reasons, permission-denied, __pycache__ being a + # file etc. + return False try: fp.write(imp.get_magic()) fp.write(struct.pack("<l", mtime)) @@ -185,12 +219,43 @@ RN = "\r\n".encode("utf-8") N = "\n".encode("utf-8") +cookie_re = re.compile(r"^[ \t\f]*#.*coding[:=][ \t]*[-\w.]+") +BOM_UTF8 = '\xef\xbb\xbf' + def _rewrite_test(state, fn): """Try to read and rewrite *fn* and return the code object.""" try: source = fn.read("rb") except EnvironmentError: return None + if ASCII_IS_DEFAULT_ENCODING: + # ASCII is the default encoding in Python 2. Without a coding + # declaration, Python 2 will complain about any bytes in the file + # outside the ASCII range. Sadly, this behavior does not extend to + # compile() or ast.parse(), which prefer to interpret the bytes as + # latin-1. (At least they properly handle explicit coding cookies.) To + # preserve this error behavior, we could force ast.parse() to use ASCII + # as the encoding by inserting a coding cookie. Unfortunately, that + # messes up line numbers. Thus, we have to check ourselves if anything + # is outside the ASCII range in the case no encoding is explicitly + # declared. For more context, see issue #269. Yay for Python 3 which + # gets this right. + end1 = source.find("\n") + end2 = source.find("\n", end1 + 1) + if (not source.startswith(BOM_UTF8) and + cookie_re.match(source[0:end1]) is None and + cookie_re.match(source[end1 + 1:end2]) is None): + if hasattr(state, "_indecode"): + return None # encodings imported us again, we don't rewrite + state._indecode = True + try: + try: + source.decode("ascii") + except UnicodeDecodeError: + # Let it fail in real import. + return None + finally: + del state._indecode # On Python versions which are not 2.7 and less than or equal to 3.1, the # parser expects *nix newlines. if REWRITE_NEWLINES: @@ -216,16 +281,16 @@ if sys.platform.startswith("win"): # Windows grants exclusive access to open files and doesn't have atomic # rename, so just write into the final file. - _write_pyc(co, fn, pyc) + _write_pyc(state, co, fn, pyc) else: # When not on windows, assume rename is atomic. Dump the code object # into a file specific to this process and atomically replace it. proc_pyc = pyc + "." + str(os.getpid()) - if _write_pyc(co, fn, proc_pyc): + if _write_pyc(state, co, fn, proc_pyc): os.rename(proc_pyc, pyc) def _read_pyc(source, pyc): - """Possibly read a py.test pyc containing rewritten code. + """Possibly read a pytest pyc containing rewritten code. Return rewritten code if successful or None if not. """ @@ -240,9 +305,8 @@ except EnvironmentError: return None # Check for invalid or out of date pyc file. - if (len(data) != 8 or - data[:4] != imp.get_magic() or - struct.unpack("<l", data[4:])[0] != mtime): + if (len(data) != 8 or data[:4] != imp.get_magic() or + struct.unpack("<l", data[4:])[0] != mtime): return None co = marshal.load(fp) if not isinstance(co, types.CodeType): @@ -259,7 +323,10 @@ _saferepr = py.io.saferepr -from _pytest.assertion.util import format_explanation as _format_explanation +from _pytest.assertion.util import format_explanation as _format_explanation # noqa + +def _should_repr_global_name(obj): + return not hasattr(obj, "__name__") and not py.builtin.callable(obj) def _format_boolop(explanations, is_or): return "(" + (is_or and " or " or " and ").join(explanations) + ")" @@ -280,35 +347,35 @@ unary_map = { - ast.Not : "not %s", - ast.Invert : "~%s", - ast.USub : "-%s", - ast.UAdd : "+%s" + ast.Not: "not %s", + ast.Invert: "~%s", + ast.USub: "-%s", + ast.UAdd: "+%s" } binop_map = { - ast.BitOr : "|", - ast.BitXor : "^", - ast.BitAnd : "&", - ast.LShift : "<<", - ast.RShift : ">>", - ast.Add : "+", - ast.Sub : "-", - ast.Mult : "*", - ast.Div : "/", - ast.FloorDiv : "//", - ast.Mod : "%", - ast.Eq : "==", - ast.NotEq : "!=", - ast.Lt : "<", - ast.LtE : "<=", - ast.Gt : ">", - ast.GtE : ">=", - ast.Pow : "**", - ast.Is : "is", - ast.IsNot : "is not", - ast.In : "in", - ast.NotIn : "not in" + ast.BitOr: "|", + ast.BitXor: "^", + ast.BitAnd: "&", + ast.LShift: "<<", + ast.RShift: ">>", + ast.Add: "+", + ast.Sub: "-", + ast.Mult: "*", + ast.Div: "/", + ast.FloorDiv: "//", + ast.Mod: "%%", # escaped for string formatting + ast.Eq: "==", + ast.NotEq: "!=", + ast.Lt: "<", + ast.LtE: "<=", + ast.Gt: ">", + ast.GtE: ">=", + ast.Pow: "**", + ast.Is: "is", + ast.IsNot: "is not", + ast.In: "in", + ast.NotIn: "not in" } @@ -341,7 +408,7 @@ lineno = 0 for item in mod.body: if (expect_docstring and isinstance(item, ast.Expr) and - isinstance(item.value, ast.Str)): + isinstance(item.value, ast.Str)): doc = item.value.s if "PYTEST_DONT_REWRITE" in doc: # The module has disabled assertion rewriting. @@ -462,7 +529,8 @@ body.append(raise_) # Clear temporary variables by setting them to None. if self.variables: - variables = [ast.Name(name, ast.Store()) for name in self.variables] + variables = [ast.Name(name, ast.Store()) + for name in self.variables] clear = ast.Assign(variables, ast.Name("None", ast.Load())) self.statements.append(clear) # Fix line numbers. @@ -471,11 +539,12 @@ return self.statements def visit_Name(self, name): - # Check if the name is local or not. + # Display the repr of the name if it's a local variable or + # _should_repr_global_name() thinks it's acceptable. locs = ast.Call(self.builtin("locals"), [], [], None, None) - globs = ast.Call(self.builtin("globals"), [], [], None, None) - ops = [ast.In(), ast.IsNot()] - test = ast.Compare(ast.Str(name.id), ops, [locs, globs]) + inlocs = ast.Compare(ast.Str(name.id), [ast.In()], [locs]) + dorepr = self.helper("should_repr_global_name", name) + test = ast.BoolOp(ast.Or(), [inlocs, dorepr]) expr = ast.IfExp(test, self.display(name), ast.Str(name.id)) return name, self.explanation_param(expr) @@ -492,7 +561,8 @@ for i, v in enumerate(boolop.values): if i: fail_inner = [] - self.on_failure.append(ast.If(cond, fail_inner, [])) + # cond is set in a prior loop iteration below + self.on_failure.append(ast.If(cond, fail_inner, [])) # noqa self.on_failure = fail_inner self.push_format_context() res, expl = self.visit(v) @@ -548,7 +618,8 @@ new_kwarg, expl = self.visit(call.kwargs) arg_expls.append("**" + expl) expl = "%s(%s)" % (func_expl, ', '.join(arg_expls)) - new_call = ast.Call(new_func, new_args, new_kwargs, new_star, new_kwarg) + new_call = ast.Call(new_func, new_args, new_kwargs, + new_star, new_kwarg) res = self.assign(new_call) res_expl = self.explanation_param(self.display(res)) outer_expl = "%s\n{%s = %s\n}" % (res_expl, res_expl, expl) @@ -584,7 +655,7 @@ res_expr = ast.Compare(left_res, [op], [next_res]) self.statements.append(ast.Assign([store_names[i]], res_expr)) left_res, left_expl = next_res, next_expl - # Use py.code._reprcompare if that's available. + # Use pytest.assertion.util._reprcompare if that's available. expl_call = self.helper("call_reprcompare", ast.Tuple(syms, ast.Load()), ast.Tuple(load_names, ast.Load()), diff --git a/_pytest/assertion/util.py b/_pytest/assertion/util.py --- a/_pytest/assertion/util.py +++ b/_pytest/assertion/util.py @@ -1,8 +1,13 @@ """Utilities for assertion debugging""" import py +try: + from collections import Sequence +except ImportError: + Sequence = list BuiltinAssertionError = py.builtin.builtins.AssertionError +u = py.builtin._totext # The _reprcompare attribute on the util module is used by the new assertion # interpretation code and assertion rewriter to detect this plugin was @@ -10,6 +15,7 @@ # DebugInterpreter. _reprcompare = None + def format_explanation(explanation): """This formats an explanation @@ -20,7 +26,18 @@ for when one explanation needs to span multiple lines, e.g. when displaying diffs. """ - # simplify 'assert False where False = ...' + explanation = _collapse_false(explanation) + lines = _split_explanation(explanation) + result = _format_lines(lines) + return u('\n').join(result) + + +def _collapse_false(explanation): + """Collapse expansions of False + + So this strips out any "assert False\n{where False = ...\n}" + blocks. + """ where = 0 while True: start = where = explanation.find("False\n{False = ", where) @@ -42,28 +59,48 @@ explanation = (explanation[:start] + explanation[start+15:end-1] + explanation[end+1:]) where -= 17 - raw_lines = (explanation or '').split('\n') - # escape newlines not followed by {, } and ~ + return explanation + + +def _split_explanation(explanation): + """Return a list of individual lines in the explanation + + This will return a list of lines split on '\n{', '\n}' and '\n~'. + Any other newlines will be escaped and appear in the line as the + literal '\n' characters. + """ + raw_lines = (explanation or u('')).split('\n') lines = [raw_lines[0]] for l in raw_lines[1:]: if l.startswith('{') or l.startswith('}') or l.startswith('~'): lines.append(l) else: lines[-1] += '\\n' + l + return lines + +def _format_lines(lines): + """Format the individual lines + + This will replace the '{', '}' and '~' characters of our mini + formatting language with the proper 'where ...', 'and ...' and ' + + ...' text, taking care of indentation along the way. + + Return a list of formatted lines. + """ result = lines[:1] stack = [0] stackcnt = [0] for line in lines[1:]: if line.startswith('{'): if stackcnt[-1]: - s = 'and ' + s = u('and ') else: - s = 'where ' + s = u('where ') stack.append(len(result)) stackcnt[-1] += 1 stackcnt.append(0) - result.append(' +' + ' '*(len(stack)-1) + s + line[1:]) + result.append(u(' +') + u(' ')*(len(stack)-1) + s + line[1:]) elif line.startswith('}'): assert line.startswith('}') stack.pop() @@ -71,9 +108,9 @@ result[stack[-1]] += line[1:] else: assert line.startswith('~') - result.append(' '*len(stack) + line[1:]) + result.append(u(' ')*len(stack) + line[1:]) assert len(stack) == 1 - return '\n'.join(result) + return result # Provide basestring in python3 @@ -83,132 +120,163 @@ basestring = str -def assertrepr_compare(op, left, right): - """return specialised explanations for some operators/operands""" - width = 80 - 15 - len(op) - 2 # 15 chars indentation, 1 space around op +def assertrepr_compare(config, op, left, right): + """Return specialised explanations for some operators/operands""" + width = 80 - 15 - len(op) - 2 # 15 chars indentation, 1 space around op left_repr = py.io.saferepr(left, maxsize=int(width/2)) right_repr = py.io.saferepr(right, maxsize=width-len(left_repr)) - summary = '%s %s %s' % (left_repr, op, right_repr) + summary = u('%s %s %s') % (left_repr, op, right_repr) - issequence = lambda x: isinstance(x, (list, tuple)) + issequence = lambda x: (isinstance(x, (list, tuple, Sequence)) + and not isinstance(x, basestring)) istext = lambda x: isinstance(x, basestring) isdict = lambda x: isinstance(x, dict) - isset = lambda x: isinstance(x, set) + isset = lambda x: isinstance(x, (set, frozenset)) + verbose = config.getoption('verbose') explanation = None try: if op == '==': if istext(left) and istext(right): - explanation = _diff_text(left, right) + explanation = _diff_text(left, right, verbose) elif issequence(left) and issequence(right): - explanation = _compare_eq_sequence(left, right) + explanation = _compare_eq_sequence(left, right, verbose) elif isset(left) and isset(right): - explanation = _compare_eq_set(left, right) + explanation = _compare_eq_set(left, right, verbose) elif isdict(left) and isdict(right): - explanation = _diff_text(py.std.pprint.pformat(left), - py.std.pprint.pformat(right)) + explanation = _compare_eq_dict(left, right, verbose) elif op == 'not in': if istext(left) and istext(right): - explanation = _notin_text(left, right) - except py.builtin._sysex: - raise - except: + explanation = _notin_text(left, right, verbose) + except Exception: excinfo = py.code.ExceptionInfo() - explanation = ['(pytest_assertion plugin: representation of ' - 'details failed. Probably an object has a faulty __repr__.)', - str(excinfo) - ] - + explanation = [ + u('(pytest_assertion plugin: representation of details failed. ' + 'Probably an object has a faulty __repr__.)'), + u(excinfo)] if not explanation: return None - # Don't include pageloads of data, should be configurable - if len(''.join(explanation)) > 80*8: - explanation = ['Detailed information too verbose, truncated'] - return [summary] + explanation -def _diff_text(left, right): - """Return the explanation for the diff between text +def _diff_text(left, right, verbose=False): + """Return the explanation for the diff between text or bytes - This will skip leading and trailing characters which are - identical to keep the diff minimal. + Unless --verbose is used this will skip leading and trailing + characters which are identical to keep the diff minimal. + + If the input are bytes they will be safely converted to text. """ explanation = [] - i = 0 # just in case left or right has zero length - for i in range(min(len(left), len(right))): - if left[i] != right[i]: - break - if i > 42: - i -= 10 # Provide some context - explanation = ['Skipping %s identical ' - 'leading characters in diff' % i] - left = left[i:] - right = right[i:] - if len(left) == len(right): - for i in range(len(left)): - if left[-i] != right[-i]: + if isinstance(left, py.builtin.bytes): + left = u(repr(left)[1:-1]).replace(r'\n', '\n') + if isinstance(right, py.builtin.bytes): + right = u(repr(right)[1:-1]).replace(r'\n', '\n') + if not verbose: + i = 0 # just in case left or right has zero length + for i in range(min(len(left), len(right))): + if left[i] != right[i]: break if i > 42: - i -= 10 # Provide some context - explanation += ['Skipping %s identical ' - 'trailing characters in diff' % i] - left = left[:-i] - right = right[:-i] + i -= 10 # Provide some context + explanation = [u('Skipping %s identical leading ' + 'characters in diff, use -v to show') % i] + left = left[i:] + right = right[i:] + if len(left) == len(right): + for i in range(len(left)): + if left[-i] != right[-i]: + break + if i > 42: + i -= 10 # Provide some context + explanation += [u('Skipping %s identical trailing ' + 'characters in diff, use -v to show') % i] + left = left[:-i] + right = right[:-i] explanation += [line.strip('\n') for line in py.std.difflib.ndiff(left.splitlines(), right.splitlines())] return explanation -def _compare_eq_sequence(left, right): +def _compare_eq_sequence(left, right, verbose=False): explanation = [] for i in range(min(len(left), len(right))): if left[i] != right[i]: - explanation += ['At index %s diff: %r != %r' % - (i, left[i], right[i])] + explanation += [u('At index %s diff: %r != %r') + % (i, left[i], right[i])] break if len(left) > len(right): - explanation += ['Left contains more items, ' - 'first extra item: %s' % py.io.saferepr(left[len(right)],)] + explanation += [u('Left contains more items, first extra item: %s') + % py.io.saferepr(left[len(right)],)] elif len(left) < len(right): - explanation += ['Right contains more items, ' - 'first extra item: %s' % py.io.saferepr(right[len(left)],)] - return explanation # + _diff_text(py.std.pprint.pformat(left), - # py.std.pprint.pformat(right)) + explanation += [ + u('Right contains more items, first extra item: %s') % + py.io.saferepr(right[len(left)],)] + return explanation # + _diff_text(py.std.pprint.pformat(left), + # py.std.pprint.pformat(right)) -def _compare_eq_set(left, right): +def _compare_eq_set(left, right, verbose=False): explanation = [] diff_left = left - right diff_right = right - left if diff_left: - explanation.append('Extra items in the left set:') + explanation.append(u('Extra items in the left set:')) for item in diff_left: explanation.append(py.io.saferepr(item)) if diff_right: - explanation.append('Extra items in the right set:') + explanation.append(u('Extra items in the right set:')) for item in diff_right: explanation.append(py.io.saferepr(item)) return explanation -def _notin_text(term, text): +def _compare_eq_dict(left, right, verbose=False): + explanation = [] + common = set(left).intersection(set(right)) + same = dict((k, left[k]) for k in common if left[k] == right[k]) + if same and not verbose: + explanation += [u('Omitting %s identical items, use -v to show') % + len(same)] + elif same: + explanation += [u('Common items:')] + explanation += py.std.pprint.pformat(same).splitlines() + diff = set(k for k in common if left[k] != right[k]) + if diff: + explanation += [u('Differing items:')] + for k in diff: + explanation += [py.io.saferepr({k: left[k]}) + ' != ' + + py.io.saferepr({k: right[k]})] + extra_left = set(left) - set(right) + if extra_left: + explanation.append(u('Left contains more items:')) + explanation.extend(py.std.pprint.pformat( + dict((k, left[k]) for k in extra_left)).splitlines()) + extra_right = set(right) - set(left) + if extra_right: + explanation.append(u('Right contains more items:')) + explanation.extend(py.std.pprint.pformat( + dict((k, right[k]) for k in extra_right)).splitlines()) + return explanation + + +def _notin_text(term, text, verbose=False): index = text.find(term) head = text[:index] tail = text[index+len(term):] correct_text = head + tail - diff = _diff_text(correct_text, text) - newdiff = ['%s is contained here:' % py.io.saferepr(term, maxsize=42)] + diff = _diff_text(correct_text, text, verbose) + newdiff = [u('%s is contained here:') % py.io.saferepr(term, maxsize=42)] for line in diff: - if line.startswith('Skipping'): + if line.startswith(u('Skipping')): continue - if line.startswith('- '): + if line.startswith(u('- ')): continue - if line.startswith('+ '): - newdiff.append(' ' + line[2:]) + if line.startswith(u('+ ')): + newdiff.append(u(' ') + line[2:]) else: newdiff.append(line) return newdiff diff --git a/_pytest/capture.py b/_pytest/capture.py --- a/_pytest/capture.py +++ b/_pytest/capture.py @@ -1,43 +1,114 @@ -""" per-test stdout/stderr capturing mechanisms, ``capsys`` and ``capfd`` function arguments. """ +""" + per-test stdout/stderr capturing mechanisms, + ``capsys`` and ``capfd`` function arguments. +""" +# note: py.io capture was where copied from +# pylib 1.4.20.dev2 (rev 13d9af95547e) +import sys +import os +import tempfile -import pytest, py -import os +import py +import pytest + +try: + from io import StringIO +except ImportError: + from StringIO import StringIO + +try: + from io import BytesIO +except ImportError: + class BytesIO(StringIO): + def write(self, data): + if isinstance(data, unicode): + raise TypeError("not a byte value: %r" % (data,)) + StringIO.write(self, data) + +if sys.version_info < (3, 0): + class TextIO(StringIO): + def write(self, data): + if not isinstance(data, unicode): + enc = getattr(self, '_encoding', 'UTF-8') + data = unicode(data, enc, 'replace') + StringIO.write(self, data) +else: + TextIO = StringIO + + +patchsysdict = {0: 'stdin', 1: 'stdout', 2: 'stderr'} + def pytest_addoption(parser): group = parser.getgroup("general") - group._addoption('--capture', action="store", default=None, - metavar="method", type="choice", choices=['fd', 'sys', 'no'], + group._addoption( + '--capture', action="store", default=None, + metavar="method", choices=['fd', 'sys', 'no'], help="per-test capturing method: one of fd (default)|sys|no.") - group._addoption('-s', action="store_const", const="no", dest="capture", + group._addoption( + '-s', action="store_const", const="no", dest="capture", help="shortcut for --capture=no.") + @pytest.mark.tryfirst -def pytest_cmdline_parse(pluginmanager, args): - # we want to perform capturing already for plugin/conftest loading - if '-s' in args or "--capture=no" in args: - method = "no" - elif hasattr(os, 'dup') and '--capture=sys' not in args: +def pytest_load_initial_conftests(early_config, parser, args, __multicall__): + ns = parser.parse_known_args(args) + method = ns.capture + if not method: method = "fd" - else: + if method == "fd" and not hasattr(os, "dup"): method = "sys" capman = CaptureManager(method) - pluginmanager.register(capman, "capturemanager") + early_config.pluginmanager.register(capman, "capturemanager") + + # make sure that capturemanager is properly reset at final shutdown + def teardown(): + try: + capman.reset_capturings() + except ValueError: + pass + + early_config.pluginmanager.add_shutdown(teardown) + + # make sure logging does not raise exceptions at the end + def silence_logging_at_shutdown(): + if "logging" in sys.modules: + sys.modules["logging"].raiseExceptions = False + early_config.pluginmanager.add_shutdown(silence_logging_at_shutdown) + + # finally trigger conftest loading but while capturing (issue93) + capman.resumecapture() + try: + try: + return __multicall__.execute() + finally: + out, err = capman.suspendcapture() + except: + sys.stdout.write(out) + sys.stderr.write(err) + raise + def addouterr(rep, outerr): for secname, content in zip(["out", "err"], outerr): if content: rep.sections.append(("Captured std%s" % secname, content)) + class NoCapture: def startall(self): pass + def resume(self): pass + def reset(self): pass + def suspend(self): return "", "" + class CaptureManager: def __init__(self, defaultmethod=None): self._method2capture = {} @@ -45,21 +116,23 @@ def _maketempfile(self): f = py.std.tempfile.TemporaryFile() - newf = py.io.dupfile(f, encoding="UTF-8") + newf = dupfile(f, encoding="UTF-8") f.close() return newf def _makestringio(self): - return py.io.TextIO() + return TextIO() def _getcapture(self, method): if method == "fd": - return py.io.StdCaptureFD(now=False, - out=self._maketempfile(), err=self._maketempfile() + return StdCaptureFD( + out=self._maketempfile(), + err=self._maketempfile(), ) elif method == "sys": - return py.io.StdCapture(now=False, - out=self._makestringio(), err=self._makestringio() + return StdCapture( + out=self._makestringio(), + err=self._makestringio(), ) elif method == "no": return NoCapture() @@ -74,23 +147,24 @@ method = config._conftest.rget("option_capture", path=fspath) except KeyError: method = "fd" - if method == "fd" and not hasattr(os, 'dup'): # e.g. jython + if method == "fd" and not hasattr(os, 'dup'): # e.g. jython method = "sys" return method def reset_capturings(self): - for name, cap in self._method2capture.items(): + for cap in self._method2capture.values(): cap.reset() def resumecapture_item(self, item): method = self._getmethod(item.config, item.fspath) if not hasattr(item, 'outerr'): - item.outerr = ('', '') # we accumulate outerr on the item + item.outerr = ('', '') # we accumulate outerr on the item return self.resumecapture(method) def resumecapture(self, method=None): if hasattr(self, '_capturing'): - raise ValueError("cannot resume, already capturing with %r" % + raise ValueError( + "cannot resume, already capturing with %r" % (self._capturing,)) if method is None: method = self._defaultmethod @@ -119,30 +193,29 @@ return "", "" def activate_funcargs(self, pyfuncitem): - if not hasattr(pyfuncitem, 'funcargs'): - return - assert not hasattr(self, '_capturing_funcargs') - self._capturing_funcargs = capturing_funcargs = [] - for name, capfuncarg in pyfuncitem.funcargs.items(): - if name in ('capsys', 'capfd'): - capturing_funcargs.append(capfuncarg) - capfuncarg._start() + funcargs = getattr(pyfuncitem, "funcargs", None) + if funcargs is not None: + for name, capfuncarg in funcargs.items(): + if name in ('capsys', 'capfd'): + assert not hasattr(self, '_capturing_funcarg') + self._capturing_funcarg = capfuncarg + capfuncarg._start() def deactivate_funcargs(self): - capturing_funcargs = getattr(self, '_capturing_funcargs', None) - if capturing_funcargs is not None: - while capturing_funcargs: - capfuncarg = capturing_funcargs.pop() - capfuncarg._finalize() - del self._capturing_funcargs + capturing_funcarg = getattr(self, '_capturing_funcarg', None) + if capturing_funcarg: + outerr = capturing_funcarg._finalize() + del self._capturing_funcarg + return outerr def pytest_make_collect_report(self, __multicall__, collector): method = self._getmethod(collector.config, collector.fspath) try: self.resumecapture(method) except ValueError: - return # recursive collect, XXX refactor capturing - # to allow for more lightweight recursive capturing + # recursive collect, XXX refactor capturing + # to allow for more lightweight recursive capturing + return try: rep = __multicall__.execute() finally: @@ -169,46 +242,371 @@ @pytest.mark.tryfirst def pytest_runtest_makereport(self, __multicall__, item, call): - self.deactivate_funcargs() + funcarg_outerr = self.deactivate_funcargs() rep = __multicall__.execute() outerr = self.suspendcapture(item) - if not rep.passed: - addouterr(rep, outerr) + if funcarg_outerr is not None: + outerr = (outerr[0] + funcarg_outerr[0], + outerr[1] + funcarg_outerr[1]) + addouterr(rep, outerr) if not rep.passed or rep.when == "teardown": outerr = ('', '') item.outerr = outerr return rep +error_capsysfderror = "cannot use capsys and capfd at the same time" + + def pytest_funcarg__capsys(request): """enables capturing of writes to sys.stdout/sys.stderr and makes captured output available via ``capsys.readouterr()`` method calls which return a ``(out, err)`` tuple. """ - return CaptureFuncarg(py.io.StdCapture) + if "capfd" in request._funcargs: + raise request.raiseerror(error_capsysfderror) + return CaptureFixture(StdCapture) + def pytest_funcarg__capfd(request): """enables capturing of writes to file descriptors 1 and 2 and makes captured output available via ``capsys.readouterr()`` method calls which return a ``(out, err)`` tuple. """ + if "capsys" in request._funcargs: + request.raiseerror(error_capsysfderror) if not hasattr(os, 'dup'): - py.test.skip("capfd funcarg needs os.dup") - return CaptureFuncarg(py.io.StdCaptureFD) + pytest.skip("capfd funcarg needs os.dup") + return CaptureFixture(StdCaptureFD) -class CaptureFuncarg: + +class CaptureFixture: def __init__(self, captureclass): - self.capture = captureclass(now=False) + self._capture = captureclass() def _start(self): - self.capture.startall() + self._capture.startall() def _finalize(self): - if hasattr(self, 'capture'): - self.capture.reset() - del self.capture + if hasattr(self, '_capture'): + outerr = self._outerr = self._capture.reset() + del self._capture + return outerr def readouterr(self): - return self.capture.readouterr() + try: + return self._capture.readouterr() + except AttributeError: + return self._outerr def close(self): self._finalize() + + +class FDCapture: + """ Capture IO to/from a given os-level filedescriptor. """ + + def __init__(self, targetfd, tmpfile=None, patchsys=False): + """ save targetfd descriptor, and open a new + temporary file there. If no tmpfile is + specified a tempfile.Tempfile() will be opened + in text mode. + """ + self.targetfd = targetfd + if tmpfile is None and targetfd != 0: + f = tempfile.TemporaryFile('wb+') + tmpfile = dupfile(f, encoding="UTF-8") + f.close() + self.tmpfile = tmpfile + self._savefd = os.dup(self.targetfd) + if patchsys: + self._oldsys = getattr(sys, patchsysdict[targetfd]) + + def start(self): + try: + os.fstat(self._savefd) + except OSError: + raise ValueError( + "saved filedescriptor not valid, " + "did you call start() twice?") + if self.targetfd == 0 and not self.tmpfile: + fd = os.open(os.devnull, os.O_RDONLY) + os.dup2(fd, 0) + os.close(fd) + if hasattr(self, '_oldsys'): + setattr(sys, patchsysdict[self.targetfd], DontReadFromInput()) + else: + os.dup2(self.tmpfile.fileno(), self.targetfd) + if hasattr(self, '_oldsys'): + setattr(sys, patchsysdict[self.targetfd], self.tmpfile) + + def done(self): + """ unpatch and clean up, returns the self.tmpfile (file object) + """ + os.dup2(self._savefd, self.targetfd) + os.close(self._savefd) + if self.targetfd != 0: + self.tmpfile.seek(0) + if hasattr(self, '_oldsys'): + setattr(sys, patchsysdict[self.targetfd], self._oldsys) + return self.tmpfile + + def writeorg(self, data): + """ write a string to the original file descriptor + """ + tempfp = tempfile.TemporaryFile() + try: + os.dup2(self._savefd, tempfp.fileno()) + tempfp.write(data) + finally: + tempfp.close() + + +def dupfile(f, mode=None, buffering=0, raising=False, encoding=None): + """ return a new open file object that's a duplicate of f + + mode is duplicated if not given, 'buffering' controls + buffer size (defaulting to no buffering) and 'raising' + defines whether an exception is raised when an incompatible + file object is passed in (if raising is False, the file + object itself will be returned) + """ + try: + fd = f.fileno() + mode = mode or f.mode + except AttributeError: + if raising: + raise + return f + newfd = os.dup(fd) + if sys.version_info >= (3, 0): + if encoding is not None: + mode = mode.replace("b", "") + buffering = True + return os.fdopen(newfd, mode, buffering, encoding, closefd=True) + else: + f = os.fdopen(newfd, mode, buffering) + if encoding is not None: + return EncodedFile(f, encoding) + return f + + +class EncodedFile(object): + def __init__(self, _stream, encoding): + self._stream = _stream + self.encoding = encoding + + def write(self, obj): + if isinstance(obj, unicode): + obj = obj.encode(self.encoding) + self._stream.write(obj) + + def writelines(self, linelist): + data = ''.join(linelist) + self.write(data) + + def __getattr__(self, name): + return getattr(self._stream, name) + + +class Capture(object): + def reset(self): + """ reset sys.stdout/stderr and return captured output as strings. """ + if hasattr(self, '_reset'): + raise ValueError("was already reset") + self._reset = True + outfile, errfile = self.done(save=False) + out, err = "", "" + if outfile and not outfile.closed: + out = outfile.read() + outfile.close() + if errfile and errfile != outfile and not errfile.closed: + err = errfile.read() + errfile.close() + return out, err + + def suspend(self): + """ return current snapshot captures, memorize tempfiles. """ + outerr = self.readouterr() + outfile, errfile = self.done() + return outerr + + +class StdCaptureFD(Capture): + """ This class allows to capture writes to FD1 and FD2 + and may connect a NULL file to FD0 (and prevent + reads from sys.stdin). If any of the 0,1,2 file descriptors + is invalid it will not be captured. + """ + def __init__(self, out=True, err=True, in_=True, patchsys=True): + self._options = { + "out": out, + "err": err, + "in_": in_, + "patchsys": patchsys, + } + self._save() + + def _save(self): + in_ = self._options['in_'] + out = self._options['out'] + err = self._options['err'] + patchsys = self._options['patchsys'] + if in_: + try: + self.in_ = FDCapture( + 0, tmpfile=None, + patchsys=patchsys) + except OSError: + pass + if out: + tmpfile = None + if hasattr(out, 'write'): + tmpfile = out + try: + self.out = FDCapture( + 1, tmpfile=tmpfile, + patchsys=patchsys) + self._options['out'] = self.out.tmpfile + except OSError: + pass + if err: + if hasattr(err, 'write'): + tmpfile = err + else: + tmpfile = None + try: + self.err = FDCapture( + 2, tmpfile=tmpfile, + patchsys=patchsys) + self._options['err'] = self.err.tmpfile + except OSError: + pass + + def startall(self): + if hasattr(self, 'in_'): + self.in_.start() + if hasattr(self, 'out'): + self.out.start() + if hasattr(self, 'err'): + self.err.start() + + def resume(self): + """ resume capturing with original temp files. """ + self.startall() + + def done(self, save=True): + """ return (outfile, errfile) and stop capturing. """ + outfile = errfile = None + if hasattr(self, 'out') and not self.out.tmpfile.closed: + outfile = self.out.done() + if hasattr(self, 'err') and not self.err.tmpfile.closed: + errfile = self.err.done() + if hasattr(self, 'in_'): + self.in_.done() + if save: + self._save() + return outfile, errfile + + def readouterr(self): + """ return snapshot value of stdout/stderr capturings. """ + out = self._readsnapshot('out') + err = self._readsnapshot('err') + return out, err + + def _readsnapshot(self, name): + if hasattr(self, name): + f = getattr(self, name).tmpfile + else: + return '' + + f.seek(0) + res = f.read() + enc = getattr(f, "encoding", None) + if enc: + res = py.builtin._totext(res, enc, "replace") + f.truncate(0) + f.seek(0) + return res + + +class StdCapture(Capture): + """ This class allows to capture writes to sys.stdout|stderr "in-memory" + and will raise errors on tries to read from sys.stdin. It only + modifies sys.stdout|stderr|stdin attributes and does not + touch underlying File Descriptors (use StdCaptureFD for that). + """ + def __init__(self, out=True, err=True, in_=True): + self._oldout = sys.stdout + self._olderr = sys.stderr + self._oldin = sys.stdin + if out and not hasattr(out, 'file'): + out = TextIO() + self.out = out + if err: + if not hasattr(err, 'write'): + err = TextIO() + self.err = err + self.in_ = in_ + + def startall(self): + if self.out: + sys.stdout = self.out + if self.err: + sys.stderr = self.err + if self.in_: + sys.stdin = self.in_ = DontReadFromInput() + + def done(self, save=True): + """ return (outfile, errfile) and stop capturing. """ + outfile = errfile = None + if self.out and not self.out.closed: + sys.stdout = self._oldout + outfile = self.out + outfile.seek(0) + if self.err and not self.err.closed: + sys.stderr = self._olderr + errfile = self.err + errfile.seek(0) + if self.in_: + sys.stdin = self._oldin + return outfile, errfile + + def resume(self): + """ resume capturing with original temp files. """ + self.startall() + + def readouterr(self): + """ return snapshot value of stdout/stderr capturings. """ + out = err = "" + if self.out: + out = self.out.getvalue() + self.out.truncate(0) + self.out.seek(0) + if self.err: + err = self.err.getvalue() + self.err.truncate(0) + self.err.seek(0) + return out, err + + +class DontReadFromInput: + """Temporary stub class. Ideally when stdin is accessed, the + capturing should be turned off, with possibly all data captured + so far sent to the screen. This should be configurable, though, + because in automated test runs it is better to crash than + hang indefinitely. + """ + def read(self, *args): + raise IOError("reading from stdin while output is captured") + readline = read + readlines = read + __iter__ = read + + def fileno(self): + raise ValueError("redirected Stdin is pseudofile, has no fileno()") + + def isatty(self): + return False + + def close(self): + pass diff --git a/_pytest/config.py b/_pytest/config.py --- a/_pytest/config.py +++ b/_pytest/config.py @@ -1,25 +1,91 @@ """ command line options, ini-file and conftest.py processing. """ import py +# DON't import pytest here because it causes import cycle troubles import sys, os +from _pytest import hookspec # the extension point definitions from _pytest.core import PluginManager -import pytest -def pytest_cmdline_parse(pluginmanager, args): - config = Config(pluginmanager) - config.parse(args) - return config +# pytest startup -def pytest_unconfigure(config): - while 1: - try: - fin = config._cleanup.pop() - except IndexError: - break - fin() +def main(args=None, plugins=None): + """ return exit code, after performing an in-process test run. + + :arg args: list of command line arguments. + + :arg plugins: list of plugin objects to be auto-registered during + initialization. + """ + config = _prepareconfig(args, plugins) + return config.hook.pytest_cmdline_main(config=config) + +class cmdline: # compatibility namespace + main = staticmethod(main) + +class UsageError(Exception): + """ error in pytest usage or invocation""" + +_preinit = [] + +default_plugins = ( + "mark main terminal runner python pdb unittest capture skipping " + "tmpdir monkeypatch recwarn pastebin helpconfig nose assertion genscript " + "junitxml resultlog doctest").split() + +def _preloadplugins(): + assert not _preinit + _preinit.append(get_plugin_manager()) + +def get_plugin_manager(): + if _preinit: + return _preinit.pop(0) + # subsequent calls to main will create a fresh instance + pluginmanager = PytestPluginManager() + pluginmanager.config = Config(pluginmanager) # XXX attr needed? + for spec in default_plugins: + pluginmanager.import_plugin(spec) + return pluginmanager + +def _prepareconfig(args=None, plugins=None): + if args is None: + args = sys.argv[1:] + elif isinstance(args, py.path.local): + args = [str(args)] + elif not isinstance(args, (tuple, list)): + if not isinstance(args, str): + raise ValueError("not a string or argument list: %r" % (args,)) + args = py.std.shlex.split(args) + pluginmanager = get_plugin_manager() + if plugins: + for plugin in plugins: + pluginmanager.register(plugin) + return pluginmanager.hook.pytest_cmdline_parse( + pluginmanager=pluginmanager, args=args) + +class PytestPluginManager(PluginManager): + def __init__(self, hookspecs=[hookspec]): + super(PytestPluginManager, self).__init__(hookspecs=hookspecs) + self.register(self) + if os.environ.get('PYTEST_DEBUG'): + err = sys.stderr + encoding = getattr(err, 'encoding', 'utf8') + try: + err = py.io.dupfile(err, encoding=encoding) + except Exception: + pass + self.trace.root.setwriter(err.write) + + def pytest_configure(self, config): + config.addinivalue_line("markers", + "tryfirst: mark a hook implementation function such that the " + "plugin machinery will try to call it first/as early as possible.") + config.addinivalue_line("markers", + "trylast: mark a hook implementation function such that the " + "plugin machinery will try to call it last/as late as possible.") + class Parser: - """ Parser for command line arguments. """ + """ Parser for command line arguments and ini-file values. """ def __init__(self, usage=None, processopt=None): self._anonymous = OptionGroup("custom options", parser=self) @@ -35,15 +101,17 @@ if option.dest: self._processopt(option) - def addnote(self, note): - self._notes.append(note) - def getgroup(self, name, description="", after=None): """ get (or create) a named option Group. - :name: unique name of the option group. + :name: name of the option group. :description: long description for --help output. :after: name of other group, used for ordering --help output. + + The returned group object has an ``addoption`` method with the same + signature as :py:func:`parser.addoption + <_pytest.config.Parser.addoption>` but will be shown in the + respective group in the output of ``pytest. --help``. """ for group in self._groups: if group.name == name: @@ -57,33 +125,222 @@ return group def addoption(self, *opts, **attrs): - """ add an optparse-style option. """ + """ register a command line option. + + :opts: option names, can be short or long options. + :attrs: same attributes which the ``add_option()`` function of the + `argparse library + <http://docs.python.org/2/library/argparse.html>`_ + accepts. + + After command line parsing options are available on the pytest config + object via ``config.option.NAME`` where ``NAME`` is usually set + by passing a ``dest`` attribute, for example + ``addoption("--long", dest="NAME", ...)``. + """ self._anonymous.addoption(*opts, **attrs) def parse(self, args): - self.optparser = optparser = MyOptionParser(self) + from _pytest._argcomplete import try_argcomplete + self.optparser = self._getparser() + try_argcomplete(self.optparser) + return self.optparser.parse_args([str(x) for x in args]) + + def _getparser(self): + from _pytest._argcomplete import filescompleter + optparser = MyOptionParser(self) groups = self._groups + [self._anonymous] for group in groups: if group.options: desc = group.description or group.name - optgroup = py.std.optparse.OptionGroup(optparser, desc) - optgroup.add_options(group.options) - optparser.add_option_group(optgroup) - return self.optparser.parse_args([str(x) for x in args]) + arggroup = optparser.add_argument_group(desc) + for option in group.options: + n = option.names() + a = option.attrs() + arggroup.add_argument(*n, **a) + # bash like autocompletion for dirs (appending '/') + optparser.add_argument(FILE_OR_DIR, nargs='*' + ).completer=filescompleter + return optparser def parse_setoption(self, args, option): - parsedoption, args = self.parse(args) + parsedoption = self.parse(args) for name, value in parsedoption.__dict__.items(): setattr(option, name, value) - return args + return getattr(parsedoption, FILE_OR_DIR) + + def parse_known_args(self, args): + optparser = self._getparser() + args = [str(x) for x in args] + return optparser.parse_known_args(args)[0] def addini(self, name, help, type=None, default=None): - """ add an ini-file option with the given name and description. """ + """ register an ini-file option. + + :name: name of the ini-variable + :type: type of the variable, can be ``pathlist``, ``args`` or ``linelist``. + :default: default value if no ini-file option exists but is queried. + + The value of ini-variables can be retrieved via a call to + :py:func:`config.getini(name) <_pytest.config.Config.getini>`. + """ assert type in (None, "pathlist", "args", "linelist") self._inidict[name] = (help, type, default) self._ininames.append(name) +class ArgumentError(Exception): + """ + Raised if an Argument instance is created with invalid or + inconsistent arguments. + """ + + def __init__(self, msg, option): + self.msg = msg + self.option_id = str(option) + + def __str__(self): + if self.option_id: + return "option %s: %s" % (self.option_id, self.msg) + else: + return self.msg + + +class Argument: + """class that mimics the necessary behaviour of py.std.optparse.Option """ + _typ_map = { + 'int': int, + 'string': str, + } + # enable after some grace period for plugin writers + TYPE_WARN = False + + def __init__(self, *names, **attrs): + """store parms in private vars for use in add_argument""" + self._attrs = attrs + self._short_opts = [] + self._long_opts = [] + self.dest = attrs.get('dest') + if self.TYPE_WARN: + try: + help = attrs['help'] + if '%default' in help: + py.std.warnings.warn( + 'pytest now uses argparse. "%default" should be' + ' changed to "%(default)s" ', + FutureWarning, + stacklevel=3) + except KeyError: + pass + try: + typ = attrs['type'] + except KeyError: + pass + else: + # this might raise a keyerror as well, don't want to catch that + if isinstance(typ, py.builtin._basestring): + if typ == 'choice': + if self.TYPE_WARN: + py.std.warnings.warn( + 'type argument to addoption() is a string %r.' + ' For parsearg this is optional and when supplied ' + ' should be a type.' + ' (options: %s)' % (typ, names), + FutureWarning, + stacklevel=3) + # argparse expects a type here take it from + # the type of the first element + attrs['type'] = type(attrs['choices'][0]) + else: + if self.TYPE_WARN: + py.std.warnings.warn( + 'type argument to addoption() is a string %r.' + ' For parsearg this should be a type.' + ' (options: %s)' % (typ, names), + FutureWarning, + stacklevel=3) + attrs['type'] = Argument._typ_map[typ] + # used in test_parseopt -> test_parse_defaultgetter + self.type = attrs['type'] + else: + self.type = typ + try: + # attribute existence is tested in Config._processopt + self.default = attrs['default'] + except KeyError: + pass + self._set_opt_strings(names) + if not self.dest: + if self._long_opts: + self.dest = self._long_opts[0][2:].replace('-', '_') + else: + try: + self.dest = self._short_opts[0][1:] + except IndexError: + raise ArgumentError( + 'need a long or short option', self) + + def names(self): + return self._short_opts + self._long_opts + + def attrs(self): + # update any attributes set by processopt + attrs = 'default dest help'.split() + if self.dest: + attrs.append(self.dest) + for attr in attrs: + try: + self._attrs[attr] = getattr(self, attr) + except AttributeError: + pass + if self._attrs.get('help'): + a = self._attrs['help'] + a = a.replace('%default', '%(default)s') + #a = a.replace('%prog', '%(prog)s') + self._attrs['help'] = a + return self._attrs + + def _set_opt_strings(self, opts): + """directly from optparse + + might not be necessary as this is passed to argparse later on""" + for opt in opts: + if len(opt) < 2: + raise ArgumentError( + "invalid option string %r: " + "must be at least two characters long" % opt, self) + elif len(opt) == 2: + if not (opt[0] == "-" and opt[1] != "-"): + raise ArgumentError( + "invalid short option string %r: " + "must be of the form -x, (x any non-dash char)" % opt, + self) + self._short_opts.append(opt) + else: + if not (opt[0:2] == "--" and opt[2] != "-"): + raise ArgumentError( + "invalid long option string %r: " + "must start with --, followed by non-dash" % opt, + self) + self._long_opts.append(opt) + + def __repr__(self): + retval = 'Argument(' + if self._short_opts: + retval += '_short_opts: ' + repr(self._short_opts) + ', ' + if self._long_opts: + retval += '_long_opts: ' + repr(self._long_opts) + ', ' + retval += 'dest: ' + repr(self.dest) + ', ' + if hasattr(self, 'type'): + retval += 'type: ' + repr(self.type) + ', ' + if hasattr(self, 'default'): + retval += 'default: ' + repr(self.default) + ', ' + if retval[-2:] == ', ': # always long enough to test ("Argument(" ) + retval = retval[:-2] + retval += ')' + return retval + + class OptionGroup: def __init__(self, name, description="", parser=None): self.name = name @@ -92,12 +349,18 @@ self.parser = parser def addoption(self, *optnames, **attrs): - """ add an option to this group. """ - option = py.std.optparse.Option(*optnames, **attrs) + """ add an option to this group. + + if a shortened version of a long option is specified it will + be suppressed in the help. addoption('--twowords', '--two-words') + results in help showing '--two-words' only, but --twowords gets + accepted **and** the automatic destination is in args.twowords + """ + option = Argument(*optnames, **attrs) self._addoption_instance(option, shortupper=False) def _addoption(self, *optnames, **attrs): - option = py.std.optparse.Option(*optnames, **attrs) + option = Argument(*optnames, **attrs) self._addoption_instance(option, shortupper=True) _______________________________________________ pypy-commit mailing list pypy-commit@python.org https://mail.python.org/mailman/listinfo/pypy-commit