Author: Amaury Forgeot d'Arc <amaur...@gmail.com> Branch: py3k Changeset: r58276:76608832a8d1 Date: 2012-10-20 17:03 +0200 http://bitbucket.org/pypy/pypy/changeset/76608832a8d1/
Log: hg merge default diff --git a/pypy/module/_cffi_backend/ctypefunc.py b/pypy/module/_cffi_backend/ctypefunc.py --- a/pypy/module/_cffi_backend/ctypefunc.py +++ b/pypy/module/_cffi_backend/ctypefunc.py @@ -4,8 +4,9 @@ import sys from pypy.interpreter.error import OperationError, operationerrfmt +from pypy.interpreter.error import wrap_oserror from pypy.rpython.lltypesystem import lltype, llmemory, rffi -from pypy.rlib import jit, clibffi, jit_libffi +from pypy.rlib import jit, clibffi, jit_libffi, rposix from pypy.rlib.jit_libffi import CIF_DESCRIPTION, CIF_DESCRIPTION_P from pypy.rlib.jit_libffi import FFI_TYPE, FFI_TYPE_P, FFI_TYPE_PP from pypy.rlib.jit_libffi import SIZE_OF_FFI_ARG @@ -147,9 +148,13 @@ argtype = self.fargs[i] if isinstance(argtype, W_CTypePointer): data = rffi.ptradd(buffer, cif_descr.exchange_args[i]) - if get_mustfree_flag(data): + flag = get_mustfree_flag(data) + if flag == 1: raw_string = rffi.cast(rffi.CCHARPP, data)[0] lltype.free(raw_string, flavor='raw') + elif flag == 2: + file = rffi.cast(rffi.CCHARPP, data)[0] + rffi_fclose(file) lltype.free(buffer, flavor='raw') return w_res @@ -164,6 +169,27 @@ assert isinstance(abi, int) return space.wrap(abi) +rffi_fdopen = rffi.llexternal("fdopen", [rffi.INT, rffi.CCHARP], rffi.CCHARP) +rffi_fclose = rffi.llexternal("fclose", [rffi.CCHARP], rffi.INT) + +def prepare_file_call_argument(fileobj): + import os + space = fileobj.space + fileobj.direct_flush() + fd = fileobj.direct_fileno() + if fd < 0: + raise OperationError(space.w_ValueError, + space.wrap("file has no OS file descriptor")) + try: + fd2 = os.dup(fd) + f = rffi_fdopen(fd2, fileobj.mode) + if not f: + os.close(fd2) + raise OSError(rposix.get_errno(), "fdopen failed") + except OSError, e: + raise wrap_oserror(space, e) + return f + # ____________________________________________________________ diff --git a/pypy/module/_cffi_backend/ctypeptr.py b/pypy/module/_cffi_backend/ctypeptr.py --- a/pypy/module/_cffi_backend/ctypeptr.py +++ b/pypy/module/_cffi_backend/ctypeptr.py @@ -157,7 +157,7 @@ space = self.space ob = space.interpclass_w(w_ob) if not isinstance(ob, cdataobj.W_CData): - raise self._convert_error("compatible pointer", w_ob) + raise self._convert_error("cdata pointer", w_ob) other = ob.ctype if not isinstance(other, W_CTypePtrBase): from pypy.module._cffi_backend import ctypearray @@ -177,7 +177,8 @@ class W_CTypePointer(W_CTypePtrBase): - _attrs_ = [] + _attrs_ = ['is_file'] + _immutable_fields_ = ['is_file'] def __init__(self, space, ctitem): from pypy.module._cffi_backend import ctypearray @@ -186,6 +187,7 @@ extra = "(*)" # obscure case: see test_array_add else: extra = " *" + self.is_file = (ctitem.name == "struct _IO_FILE") W_CTypePtrBase.__init__(self, space, size, extra, 2, ctitem) def newp(self, w_init): @@ -234,7 +236,7 @@ p = rffi.ptradd(cdata, i * self.ctitem.size) return cdataobj.W_CData(space, p, self) - def _prepare_pointer_call_argument(self, w_init): + def _prepare_pointer_call_argument(self, w_init, cdata): space = self.space if (space.isinstance_w(w_init, space.w_list) or space.isinstance_w(w_init, space.w_tuple)): @@ -243,10 +245,19 @@ space.isinstance_w(w_init, space.w_bytes)): # from a string, we add the null terminator length = space.int_w(space.len(w_init)) + 1 + elif self.is_file: + from pypy.module._file.interp_file import W_File + from pypy.module._cffi_backend import ctypefunc + ob = space.interpclass_w(w_init) + if isinstance(ob, W_File): + result = ctypefunc.prepare_file_call_argument(ob) + rffi.cast(rffi.CCHARPP, cdata)[0] = result + return 2 + return 0 else: - return lltype.nullptr(rffi.CCHARP.TO) + return 0 if self.ctitem.size <= 0: - return lltype.nullptr(rffi.CCHARP.TO) + return 0 try: datasize = ovfcheck(length * self.ctitem.size) except OverflowError: @@ -259,25 +270,19 @@ except Exception: lltype.free(result, flavor='raw') raise - return result + rffi.cast(rffi.CCHARPP, cdata)[0] = result + return 1 def convert_argument_from_object(self, cdata, w_ob): from pypy.module._cffi_backend.ctypefunc import set_mustfree_flag space = self.space ob = space.interpclass_w(w_ob) - if isinstance(ob, cdataobj.W_CData): - buffer = lltype.nullptr(rffi.CCHARP.TO) - else: - buffer = self._prepare_pointer_call_argument(w_ob) - # - if buffer: - rffi.cast(rffi.CCHARPP, cdata)[0] = buffer - set_mustfree_flag(cdata, True) - return True - else: - set_mustfree_flag(cdata, False) + result = (not isinstance(ob, cdataobj.W_CData) and + self._prepare_pointer_call_argument(w_ob, cdata)) + if result == 0: self.convert_from_object(cdata, w_ob) - return False + set_mustfree_flag(cdata, result) + return result def getcfield(self, attr): return self.ctitem.getcfield(attr) diff --git a/pypy/module/_cffi_backend/test/_backend_test_c.py b/pypy/module/_cffi_backend/test/_backend_test_c.py --- a/pypy/module/_cffi_backend/test/_backend_test_c.py +++ b/pypy/module/_cffi_backend/test/_backend_test_c.py @@ -2211,3 +2211,56 @@ buffer(p)[:] = bytearray(b"foo\x00") assert len(p) == 4 assert list(p) == [b"f", b"o", b"o", b"\x00"] + +def test_FILE(): + if sys.platform == "win32": + py.test.skip("testing FILE not implemented") + # + BFILE = new_struct_type("_IO_FILE") + BFILEP = new_pointer_type(BFILE) + BChar = new_primitive_type("char") + BCharP = new_pointer_type(BChar) + BInt = new_primitive_type("int") + BFunc = new_function_type((BCharP, BFILEP), BInt, False) + BFunc2 = new_function_type((BFILEP, BCharP), BInt, True) + ll = find_and_load_library('c') + fputs = ll.load_function(BFunc, "fputs") + fscanf = ll.load_function(BFunc2, "fscanf") + # + import posix + fdr, fdw = posix.pipe() + fr1 = posix.fdopen(fdr, 'r', 256) + fw1 = posix.fdopen(fdw, 'w', 256) + # + fw1.write(b"X") + res = fputs(b"hello world\n", fw1) + assert res >= 0 + fw1.close() + # + p = newp(new_array_type(BCharP, 100), None) + res = fscanf(fr1, b"%s\n", p) + assert res == 1 + assert string(p) == b"Xhello" + fr1.close() + +def test_FILE_only_for_FILE_arg(): + if sys.platform == "win32": + py.test.skip("testing FILE not implemented") + # + B_NOT_FILE = new_struct_type("NOT_FILE") + B_NOT_FILEP = new_pointer_type(B_NOT_FILE) + BChar = new_primitive_type("char") + BCharP = new_pointer_type(BChar) + BInt = new_primitive_type("int") + BFunc = new_function_type((BCharP, B_NOT_FILEP), BInt, False) + ll = find_and_load_library('c') + fputs = ll.load_function(BFunc, "fputs") + # + import posix + fdr, fdw = posix.pipe() + fr1 = posix.fdopen(fdr, 'r') + fw1 = posix.fdopen(fdw, 'w') + # + e = py.test.raises(TypeError, fputs, b"hello world\n", fw1) + assert str(e.value) == ("initializer for ctype 'struct NOT_FILE *' must " + "be a cdata pointer, not file") diff --git a/pypy/module/_sre/test/test_app_sre.py b/pypy/module/_sre/test/test_app_sre.py --- a/pypy/module/_sre/test/test_app_sre.py +++ b/pypy/module/_sre/test/test_app_sre.py @@ -5,17 +5,20 @@ from pypy.interpreter.gateway import app2interp_temp from pypy.conftest import gettestobjspace, option -def init_globals_hack(space): - space.appexec([space.wrap(autopath.this_dir)], """(this_dir): - import builtins as b - import sys, os.path - # Uh-oh, ugly hack - sys.path.insert(0, this_dir) - import support_test_app_sre - b.s = support_test_app_sre - sys.path.pop(0) +def init_app_test(cls, space): + cls.w_s = space.appexec([space.wrap(autopath.this_dir)], + """(this_dir): + import sys + # Uh-oh, ugly hack + sys.path.insert(0, this_dir) + try: + import support_test_app_sre + return support_test_app_sre + finally: + sys.path.pop(0) """) + class AppTestSrePy: def test_magic(self): @@ -315,7 +318,7 @@ def test_scanner_zero_width_match(self): import re, sys if sys.version_info[:2] == (2, 3): - return + skip("2.3 is different here") p = re.compile(".*").scanner("bla") assert ("bla", "") == (p.search().group(0), p.search().group(0)) assert None == p.search() @@ -329,7 +332,7 @@ cls.space = gettestobjspace(usemodules=('_locale',)) except py.test.skip.Exception: cls.space = gettestobjspace(usemodules=('_rawffi',)) - init_globals_hack(cls.space) + init_app_test(cls, cls.space) def setup_method(self, method): import locale @@ -340,11 +343,13 @@ locale.setlocale(locale.LC_ALL, (None, None)) def test_getlower_no_flags(self): + s = self.s UPPER_AE = "\xc4" s.assert_lower_equal([("a", "a"), ("A", "a"), (UPPER_AE, UPPER_AE), ("\u00c4", "\u00c4"), ("\u4444", "\u4444")], 0) def test_getlower_locale(self): + s = self.s import locale, sre_constants UPPER_AE = "\xc4" LOWER_AE = "\xe4" @@ -359,6 +364,7 @@ skip("unsupported locale de_DE") def test_getlower_unicode(self): + s = self.s import sre_constants UPPER_AE = "\xc4" LOWER_AE = "\xe4" @@ -587,34 +593,41 @@ class AppTestOpcodes: def setup_class(cls): + if option.runappdirect: + py.test.skip("can only be run on py.py: _sre opcodes don't match") try: cls.space = gettestobjspace(usemodules=('_locale',)) except py.test.skip.Exception: cls.space = gettestobjspace(usemodules=('_rawffi',)) # This imports support_test_sre as the global "s" - init_globals_hack(cls.space) + init_app_test(cls, cls.space) def test_length_optimization(self): + s = self.s pattern = "bla" opcodes = [s.OPCODES["info"], 3, 3, len(pattern)] \ + s.encode_literal(pattern) + [s.OPCODES["success"]] s.assert_no_match(opcodes, ["b", "bl", "ab"]) def test_literal(self): + s = self.s opcodes = s.encode_literal("bla") + [s.OPCODES["success"]] s.assert_no_match(opcodes, ["bl", "blu"]) s.assert_match(opcodes, ["bla", "blab", "cbla", "bbla"]) def test_not_literal(self): + s = self.s opcodes = s.encode_literal("b") \ + [s.OPCODES["not_literal"], ord("a"), s.OPCODES["success"]] s.assert_match(opcodes, ["bx", "ababy"]) s.assert_no_match(opcodes, ["ba", "jabadu"]) def test_unknown(self): + s = self.s raises(RuntimeError, s.search, [55555], "b") def test_at_beginning(self): + s = self.s for atname in ["at_beginning", "at_beginning_string"]: opcodes = [s.OPCODES["at"], s.ATCODES[atname]] \ + s.encode_literal("bla") + [s.OPCODES["success"]] @@ -622,30 +635,35 @@ s.assert_no_match(opcodes, "abla") def test_at_beginning_line(self): + s = self.s opcodes = [s.OPCODES["at"], s.ATCODES["at_beginning_line"]] \ + s.encode_literal("bla") + [s.OPCODES["success"]] s.assert_match(opcodes, ["bla", "x\nbla"]) s.assert_no_match(opcodes, ["abla", "abla\nubla"]) def test_at_end(self): + s = self.s opcodes = s.encode_literal("bla") \ + [s.OPCODES["at"], s.ATCODES["at_end"], s.OPCODES["success"]] s.assert_match(opcodes, ["bla", "bla\n"]) s.assert_no_match(opcodes, ["blau", "abla\nblau"]) def test_at_end_line(self): + s = self.s opcodes = s.encode_literal("bla") \ + [s.OPCODES["at"], s.ATCODES["at_end_line"], s.OPCODES["success"]] s.assert_match(opcodes, ["bla\n", "bla\nx", "bla"]) s.assert_no_match(opcodes, ["blau"]) def test_at_end_string(self): + s = self.s opcodes = s.encode_literal("bla") \ + [s.OPCODES["at"], s.ATCODES["at_end_string"], s.OPCODES["success"]] s.assert_match(opcodes, "bla") s.assert_no_match(opcodes, ["blau", "bla\n"]) def test_at_boundary(self): + s = self.s for atname in "at_boundary", "at_loc_boundary", "at_uni_boundary": opcodes = s.encode_literal("bla") \ + [s.OPCODES["at"], s.ATCODES[atname], s.OPCODES["success"]] @@ -657,6 +675,7 @@ s.assert_no_match(opcodes, "") def test_at_non_boundary(self): + s = self.s for atname in "at_non_boundary", "at_loc_non_boundary", "at_uni_non_boundary": opcodes = s.encode_literal("bla") \ + [s.OPCODES["at"], s.ATCODES[atname], s.OPCODES["success"]] @@ -664,6 +683,7 @@ s.assert_no_match(opcodes, ["bla ja", "bla"]) def test_at_loc_boundary(self): + s = self.s import locale try: s.void_locale() @@ -683,6 +703,7 @@ skip("locale error") def test_at_uni_boundary(self): + s = self.s UPPER_PI = "\u03a0" LOWER_PI = "\u03c0" opcodes = s.encode_literal("bl") + [s.OPCODES["any"], s.OPCODES["at"], @@ -694,6 +715,7 @@ s.assert_match(opcodes, ["blaha", "bl%sja" % UPPER_PI]) def test_category_loc_word(self): + s = self.s import locale try: s.void_locale() @@ -714,23 +736,27 @@ skip("locale error") def test_any(self): + s = self.s opcodes = s.encode_literal("b") + [s.OPCODES["any"]] \ + s.encode_literal("a") + [s.OPCODES["success"]] s.assert_match(opcodes, ["b a", "bla", "bboas"]) s.assert_no_match(opcodes, ["b\na", "oba", "b"]) def test_any_all(self): + s = self.s opcodes = s.encode_literal("b") + [s.OPCODES["any_all"]] \ + s.encode_literal("a") + [s.OPCODES["success"]] s.assert_match(opcodes, ["b a", "bla", "bboas", "b\na"]) s.assert_no_match(opcodes, ["oba", "b"]) def test_in_failure(self): + s = self.s opcodes = s.encode_literal("b") + [s.OPCODES["in"], 2, s.OPCODES["failure"]] \ + s.encode_literal("a") + [s.OPCODES["success"]] s.assert_no_match(opcodes, ["ba", "bla"]) def test_in_literal(self): + s = self.s opcodes = s.encode_literal("b") + [s.OPCODES["in"], 7] \ + s.encode_literal("la") + [s.OPCODES["failure"], s.OPCODES["failure"]] \ + s.encode_literal("a") + [s.OPCODES["success"]] @@ -738,6 +764,7 @@ s.assert_no_match(opcodes, ["ba", "bja", "blla"]) def test_in_category(self): + s = self.s opcodes = s.encode_literal("b") + [s.OPCODES["in"], 6, s.OPCODES["category"], s.CHCODES["category_digit"], s.OPCODES["category"], s.CHCODES["category_space"], s.OPCODES["failure"]] + s.encode_literal("a") + [s.OPCODES["success"]] @@ -748,6 +775,7 @@ import _sre if _sre.CODESIZE != 2: return + s = self.s # charset bitmap for characters "l" and "h" bitmap = 6 * [0] + [4352] + 9 * [0] opcodes = s.encode_literal("b") + [s.OPCODES["in"], 19, s.OPCODES["charset"]] \ @@ -759,6 +787,7 @@ # disabled because this actually only works on big-endian machines if _sre.CODESIZE != 2: return + s = self.s # constructing bigcharset for lowercase pi (\u03c0) UPPER_PI = u"\u03a0" LOWER_PI = u"\u03c0" @@ -774,6 +803,7 @@ # XXX bigcharset test for ucs4 missing here def test_in_range(self): + s = self.s opcodes = s.encode_literal("b") + [s.OPCODES["in"], 5, s.OPCODES["range"], ord("1"), ord("9"), s.OPCODES["failure"]] \ + s.encode_literal("a") + [s.OPCODES["success"]] @@ -781,6 +811,7 @@ s.assert_no_match(opcodes, ["baa", "b5"]) def test_in_negate(self): + s = self.s opcodes = s.encode_literal("b") + [s.OPCODES["in"], 7, s.OPCODES["negate"]] \ + s.encode_literal("la") + [s.OPCODES["failure"]] \ + s.encode_literal("a") + [s.OPCODES["success"]] @@ -788,12 +819,14 @@ s.assert_no_match(opcodes, ["bla", "baa", "blbla"]) def test_literal_ignore(self): + s = self.s opcodes = s.encode_literal("b") \ + [s.OPCODES["literal_ignore"], ord("a"), s.OPCODES["success"]] s.assert_match(opcodes, ["ba", "bA"]) s.assert_no_match(opcodes, ["bb", "bu"]) def test_not_literal_ignore(self): + s = self.s UPPER_PI = "\u03a0" opcodes = s.encode_literal("b") \ + [s.OPCODES["not_literal_ignore"], ord("a"), s.OPCODES["success"]] @@ -801,6 +834,7 @@ s.assert_no_match(opcodes, ["ba", "bA"]) def test_in_ignore(self): + s = self.s opcodes = s.encode_literal("b") + [s.OPCODES["in_ignore"], 8] \ + s.encode_literal("abc") + [s.OPCODES["failure"]] \ + s.encode_literal("a") + [s.OPCODES["success"]] @@ -808,6 +842,7 @@ s.assert_no_match(opcodes, ["ba", "bja", "blla"]) def test_in_jump_info(self): + s = self.s for opname in "jump", "info": opcodes = s.encode_literal("b") \ + [s.OPCODES[opname], 3, s.OPCODES["failure"], s.OPCODES["failure"]] \ @@ -815,6 +850,7 @@ s.assert_match(opcodes, "ba") def _test_mark(self): + s = self.s # XXX need to rewrite this implementation-independent opcodes = s.encode_literal("a") + [s.OPCODES["mark"], 0] \ + s.encode_literal("b") + [s.OPCODES["mark"], 1, s.OPCODES["success"]] @@ -826,6 +862,7 @@ assert [1, 2] == state.marks def test_branch(self): + s = self.s opcodes = [s.OPCODES["branch"], 7] + s.encode_literal("ab") \ + [s.OPCODES["jump"], 9, 7] + s.encode_literal("cd") \ + [s.OPCODES["jump"], 2, s.OPCODES["failure"], s.OPCODES["success"]] @@ -833,18 +870,21 @@ s.assert_no_match(opcodes, ["aacas", "ac", "bla"]) def test_repeat_one(self): + s = self.s opcodes = [s.OPCODES["repeat_one"], 6, 1, 65535] + s.encode_literal("a") \ + [s.OPCODES["success"]] + s.encode_literal("ab") + [s.OPCODES["success"]] s.assert_match(opcodes, ["aab", "aaaab"]) s.assert_no_match(opcodes, ["ab", "a"]) def test_min_repeat_one(self): + s = self.s opcodes = [s.OPCODES["min_repeat_one"], 5, 1, 65535, s.OPCODES["any"]] \ + [s.OPCODES["success"]] + s.encode_literal("b") + [s.OPCODES["success"]] s.assert_match(opcodes, ["aab", "ardb", "bb"]) s.assert_no_match(opcodes, ["b"]) def test_repeat_maximizing(self): + s = self.s opcodes = [s.OPCODES["repeat"], 5, 1, 65535] + s.encode_literal("a") \ + [s.OPCODES["max_until"]] + s.encode_literal("b") + [s.OPCODES["success"]] s.assert_match(opcodes, ["ab", "aaaab", "baabb"]) @@ -856,6 +896,7 @@ # CPython 2.3 fails with a recursion limit exceeded error here. import sys if not sys.version_info[:2] == (2, 3): + s = self.s opcodes = [s.OPCODES["repeat"], 10, 1, 65535, s.OPCODES["repeat_one"], 6, 0, 65535] + s.encode_literal("a") + [s.OPCODES["success"], s.OPCODES["max_until"], s.OPCODES["success"]] @@ -863,6 +904,7 @@ assert "" == s.search(opcodes, "bb").group(0) def test_repeat_minimizing(self): + s = self.s opcodes = [s.OPCODES["repeat"], 4, 1, 65535, s.OPCODES["any"], s.OPCODES["min_until"]] + s.encode_literal("b") + [s.OPCODES["success"]] s.assert_match(opcodes, ["ab", "aaaab", "baabb"]) @@ -870,24 +912,28 @@ assert "aab" == s.search(opcodes, "aabb").group(0) def test_groupref(self): + s = self.s opcodes = [s.OPCODES["mark"], 0, s.OPCODES["any"], s.OPCODES["mark"], 1] \ + s.encode_literal("a") + [s.OPCODES["groupref"], 0, s.OPCODES["success"]] s.assert_match(opcodes, ["bab", "aaa", "dad"]) s.assert_no_match(opcodes, ["ba", "bad", "baad"]) def test_groupref_ignore(self): + s = self.s opcodes = [s.OPCODES["mark"], 0, s.OPCODES["any"], s.OPCODES["mark"], 1] \ + s.encode_literal("a") + [s.OPCODES["groupref_ignore"], 0, s.OPCODES["success"]] s.assert_match(opcodes, ["bab", "baB", "Dad"]) s.assert_no_match(opcodes, ["ba", "bad", "baad"]) def test_assert(self): + s = self.s opcodes = s.encode_literal("a") + [s.OPCODES["assert"], 4, 0] \ + s.encode_literal("b") + [s.OPCODES["success"], s.OPCODES["success"]] assert "a" == s.search(opcodes, "ab").group(0) s.assert_no_match(opcodes, ["a", "aa"]) def test_assert_not(self): + s = self.s opcodes = s.encode_literal("a") + [s.OPCODES["assert_not"], 4, 0] \ + s.encode_literal("b") + [s.OPCODES["success"], s.OPCODES["success"]] assert "a" == s.search(opcodes, "ac").group(0) _______________________________________________ pypy-commit mailing list pypy-commit@python.org http://mail.python.org/mailman/listinfo/pypy-commit