Author: Matti Picus <[email protected]>
Branch: code_page-utf8
Changeset: r98014:b27e7cddb1e0
Date: 2019-11-10 14:34 -0500
http://bitbucket.org/pypy/pypy/changeset/b27e7cddb1e0/

Log:    test, implement code page encoding/decoding via a new
        unicodehelper_win32.py

diff --git a/pypy/interpreter/unicodehelper.py 
b/pypy/interpreter/unicodehelper.py
--- a/pypy/interpreter/unicodehelper.py
+++ b/pypy/interpreter/unicodehelper.py
@@ -61,11 +61,10 @@
     state = space.fromcache(interp_codecs.CodecState)
     errorhandler=state.decode_error_handler
     if _WIN32:
+        import pypy.interpreter.unicodehelper_win32 as win32
         bytes = space.bytes_w(w_string)
         slen = len(bytes)
-        uni, lgt = runicode.str_decode_mbcs(bytes, slen, 'strict', final=True,
-                           errorhandler=errorhandler, force_ignore=False)
-        utf8 = uni.encode('utf-8')
+        utf8, _, lgt = str_decode_mbcs(bytes, 'strict', True, errorhandler)
     elif 0 and  _MACOSX:
         bytes = space.bytes_w(w_string)
         utf8, lgt, pos  = str_decode_utf8(bytes, 'surrogateescape', True,
@@ -362,29 +361,31 @@
     return result.build()
 
 if _WIN32:
+    import pypy.interpreter.unicodehelper_win32 as win32
     def utf8_encode_mbcs(s, errors, errorhandler, allow_surrogates=False):
-        res = rutf8.utf8_encode_mbcs(s, errors, errorhandler,
-                                     force_replace=False)
+        res = win32.utf8_encode_mbcs(s, errors, errorhandler)
         return res
 
-    def str_decode_mbcs(s, errors, final, errorhandler, force_ignore=True):
-        slen = len(s)
-        res, size = runicode.str_decode_mbcs(s, slen, errors, final=final,
-                           errorhandler=errorhandler, 
force_ignore=force_ignore)
-        res_utf8 = runicode.unicode_encode_utf_8(res, size, 'strict')
-        return res_utf8, len(res), size
-
-    def utf8_encode_code_page(s, errors, errorhandler, allow_surrogates=False):
-        pass
-
-    def str_decode_code_page(s, errors, final, errorhandler, 
force_ignore=True):
-        pass
+    def str_decode_mbcs(s, errors, final, errorhandler):
+        res, size = win32.str_decode_mbcs(s, errors, errorhandler, final=final)
+        return res, len(res), size
 
     def utf8_encode_oem(s, errors, errorhandler, allow_surrogates=False):
-        pass
+        res = win32.utf8_encode_oem(s, errors, errorhandler)
+        return res
 
-    def str_decode_oem(s, errors, final, errorhandler, force_ignore=True):
-        pass
+    def str_decode_oem(s, errors, final, errorhandler):
+        res, size = win32.str_decode_oem(s, errors, errorhandler, final)
+        return res, len(res), size
+
+    def utf8_encode_code_page(cp, s, errors, errorhandler, 
allow_surrogates=False):
+        res = win32.utf8_encode_code_page(cp, s, errors, errorhandler)
+        return res
+
+    def str_decode_code_page(cp, s, errors, final, errorhandler):
+        res, size = win32.str_decode_code_page(cp, s, errors, errorhandler, 
final)
+        return res, len(res), size
+
 
 def str_decode_utf8(s, errors, final, errorhandler, allow_surrogates=False):
     try:
diff --git a/pypy/interpreter/unicodehelper_win32.py 
b/pypy/interpreter/unicodehelper_win32.py
new file mode 100644
--- /dev/null
+++ b/pypy/interpreter/unicodehelper_win32.py
@@ -0,0 +1,202 @@
+from rpython.rtyper.lltypesystem import lltype, rffi
+from rpython.rlib.runicode import (BOOLP, WideCharToMultiByte,
+         MultiByteToWideChar)
+from rpython.rlib.rutf8 import (Utf8StringBuilder, Utf8StringIterator,
+         next_codepoint_pos)
+from rpython.rlib import rwin32
+
+def Py_UNICODE_HIGH_SURROGATE(ch):
+   return rffi.cast(lltype.UniChar, 0xD800 - (0x10000 >> 10) + ((ch) >> 10))
+
+def Py_UNICODE_LOW_SURROGATE(ch):
+    return rffi.cast(lltype.uniChar, 0xDC00 + ((ch) & 0x3FF))
+
+if rffi.sizeof(rffi.INT) < rffi.sizeof(rffi.SIZE_T):
+    NEED_RETRY = True
+else:
+    NEED_RETRY = False
+WC_ERR_INVALID_CHARS = 0x0080
+
+code_page_map = {
+        rwin32.CP_ACP: "mbcs",
+        rwin32.CP_UTF7:"CP_UTF7",
+        rwin32.CP_UTF8:"CP_UTF8",
+    }
+
+def _code_page_name(code_page):
+    return code_page_map.get(code_page, "cp%d" % code_page)
+
+def _decode_code_page_flags(code_page):
+    if code_page == rwin32.CP_UTF7:
+        # The CP_UTF7 decoder only supports flags==0
+        return 0
+    return rwin32.MB_ERR_INVALID_CHARS
+
+def _encode_code_page_flags(code_page, errors):
+    if code_page == rwin32.CP_UTF8:
+        return WC_ERR_INVALID_CHARS
+    elif code_page == rwin32.CP_UTF7:
+        return 0
+    if errors == 'replace':
+        return 0
+    return rwin32.WC_NO_BEST_FIT_CHARS
+
+def _decode_cp_error(s, errorhandler, encoding, errors, start, end):
+    # late import to avoid circular import
+    from pypy.interpreter.unicodehelper import _str_decode_utf8_slowpath
+    if rwin32.GetLastError_saved() == rwin32.ERROR_NO_UNICODE_TRANSLATION:
+        msg = ("No mapping for the Unicode character exists in the target "
+               "multi-byte code page.")
+        r, ignore1, ignore2 = _str_decode_utf8_slowpath(s[start:end], errors, 
False, errorhandler, False)
+        return r, end
+    else:
+        raise rwin32.lastSavedWindowsError()
+
+def _unibuf_to_utf8(uni, insize):
+    """Encode the widechar unicode buffer u to utf8
+    Should never error, since the buffer comes from a call to
+    MultiByteToWideChar
+    """
+    flags = 0
+    cp = rwin32.CP_UTF8
+    used_default_p = lltype.nullptr(BOOLP.TO)
+    assert uni is not None
+    with rffi.scoped_nonmoving_unicodebuffer(uni) as dataptr:
+        # first get the size of the result
+        outsize = WideCharToMultiByte(cp, flags, dataptr, insize,
+                                    None, 0, None, used_default_p)
+        if outsize == 0:
+            raise rwin32.lastSavedWindowsError()
+        with rffi.scoped_alloc_buffer(outsize) as buf:
+            # do the conversion
+            if WideCharToMultiByte(cp, flags, dataptr, insize, buf.raw,
+                    outsize, None, used_default_p) == 0:
+                raise rwin32.lastSavedWindowsError()
+            result = buf.str(outsize)
+            assert result is not None
+            return result
+
+def _decode_helper(cp, s, flags, encoding, errors, errorhandler, 
+                   start, end, res):
+    if end > len(s):
+        end = len(s)
+    piece = s[start:end]
+    with rffi.scoped_nonmovingbuffer(piece) as dataptr:
+        # first get the size of the result
+        outsize = MultiByteToWideChar(cp, flags, dataptr, len(piece),
+                                    lltype.nullptr(rffi.CWCHARP.TO), 0)
+        if outsize == 0:
+            r, pos = _decode_cp_error(s, errorhandler,
+                                               encoding, errors, start, end)
+            res.append(r)
+            return pos
+
+        with rffi.scoped_alloc_unicodebuffer(outsize) as buf:
+            # do the conversion
+            if MultiByteToWideChar(cp, flags, dataptr, len(piece),
+                                   buf.raw, outsize) == 0:
+                r, pos = _decode_cp_error(s, errorhandler,
+                                             encoding, errors, start, end)
+                res.append(r)
+                return pos
+            else:
+                res.append(_unibuf_to_utf8(buf.str(outsize), outsize))
+    return end
+
+def str_decode_code_page(cp, s, errors, errorhandler, final=False):
+    """Decodes a byte string s from a code page cp with an error handler.
+    Returns utf8 result, original s length
+    """
+    insize = len(s)
+    if insize == 0:
+        return '', 0
+    flags = _decode_code_page_flags(cp)
+    encoding = _code_page_name(cp)
+    assert errorhandler is not None
+    res = Utf8StringBuilder(insize)
+    if errors == 'strict':
+        _decode_helper(cp, s, flags, encoding, errors, errorhandler,
+                       0, len(s), res)
+    else:
+        prev_pos = 0
+        pos = 0
+        while pos < len(s):
+            pos = next_codepoint_pos(s, prev_pos)
+            pos = _decode_helper(cp, s, flags, encoding,
+                            errors, errorhandler, prev_pos, pos, res)
+            prev_pos = pos
+    return res.build(), insize
+
+def str_decode_mbcs(s, errors, errorhandler, final=False):
+    return str_decode_code_page(rwin32.CP_ACP, s, errors, errorhandler, final)
+
+def str_decode_oem(s, errors, errorhandler, final=False):
+    return str_decode_code_page(rwin32.CP_OEMCP, s, errors, errorhandler, 
final)
+
+def utf8_encode_code_page(cp, s, errors, errorhandler):
+    """Encode a utf8 string s using code page cp and the given
+    errors/errorhandler.
+    Returns a encoded byte string
+    """
+
+    name = _code_page_name(cp)
+    lgt = len(s)
+
+    if lgt == 0:
+        return ''
+    flags = _encode_code_page_flags(cp, errors)
+    if cp in (rwin32.CP_UTF8, rwin32.CP_UTF7):
+        used_default_p = lltype.nullptr(BOOLP.TO)
+    else:
+        used_default_p = lltype.malloc(BOOLP.TO, 1, flavor='raw')
+    # Encode one codpoint at a time to allow the errorhandlers to do
+    # their thing
+    chars = lltype.malloc(rffi.CWCHARP.TO, 2, flavor = 'raw')
+    res = Utf8StringBuilder(lgt)
+    try:
+        pos = 0
+        for uni in Utf8StringIterator(s):
+            if used_default_p:
+                used_default_p[0] = rffi.cast(rwin32.BOOL, False)
+            if uni < 0x10000:
+                chars[0] = rffi.cast(lltype.UniChar, uni)
+                charsize = 1
+            else:
+                chars[0] = Py_UNICODE_HIGH_SURROGATE(uni)
+                chars[0] = Py_UNICODE_LOW_SURROGATE(uni)
+                charsize = 2
+                # first get the size of the result
+            outsize = WideCharToMultiByte(cp, flags, chars, charsize, None, 0,
+                                           None, used_default_p)
+            if outsize == 0:
+                raise rwin32.lastSavedWindowsError()
+            # If we used a default char, then we failed!
+            if (used_default_p and rffi.cast(lltype.Bool, used_default_p[0])):
+                r, pos, retype = errorhandler(errors, name, "invalid 
character", s, pos, pos+1)
+                res.append(r)
+                continue
+            with rffi.scoped_alloc_buffer(outsize) as buf:
+                # do the conversion
+                if WideCharToMultiByte(cp, flags,
+                                       chars, charsize, buf.raw, outsize,
+                                       None, used_default_p) == 0:
+                    raise rwin32.lastSavedWindowsError()
+                if (used_default_p and
+                    rffi.cast(lltype.Bool, used_default_p[0])):
+                        r, pos, rettype = errorhandler(errors, name, "invalid 
character",
+                                           s, pos, pos + 1)
+                        res.append(r)
+                else:
+                    res.append(buf.str(outsize))
+            pos += 1
+        return res.build()
+    finally:
+        lltype.free(chars, flavor='raw')
+        if used_default_p:
+            lltype.free(used_default_p, flavor='raw')
+
+def utf8_encode_mbcs(s, errors, errorhandler):
+        return utf8_encode_code_page(rwin32.CP_ACP, s, errors, errorhandler)
+
+def utf8_encode_oem(s, errors, errorhandler):
+        return utf8_encode_code_page(rwin32.CP_OEMCP, s, errors, errorhandler)
diff --git a/pypy/module/_codecs/interp_codecs.py 
b/pypy/module/_codecs/interp_codecs.py
--- a/pypy/module/_codecs/interp_codecs.py
+++ b/pypy/module/_codecs/interp_codecs.py
@@ -708,11 +708,41 @@
 if getattr(unicodehelper, '_WIN32', False):
     make_encoder_wrapper('mbcs_encode')
     make_decoder_wrapper('mbcs_decode')
-    make_encoder_wrapper('code_page_encode')
-    make_decoder_wrapper('code_page_decode')
     make_encoder_wrapper('oem_encode')
     make_decoder_wrapper('oem_decode')
 
+    # need to add the code_page argument
+
+    @unwrap_spec(code_page=int, errors='text_or_none')
+    def code_page_encode(space, code_page, w_arg, errors="strict"):
+        # w_arg is a W_Unicode or W_Bytes?
+        w_arg = space.convert_arg_to_w_unicode(w_arg, errors)
+        if errors is None:
+            errors = 'strict'
+        allow_surrogates = False
+        if errors in ('surrogatepass',):
+            allow_surrogates = True
+        state = space.fromcache(CodecState)
+        ulen = w_arg._length
+        result = unicodehelper.utf8_encode_code_page(code_page, w_arg._utf8,
+                      errors, state.encode_error_handler,
+                      allow_surrogates=allow_surrogates)
+        return space.newtuple([space.newbytes(result), space.newint(ulen)])
+
+    @unwrap_spec(code_page=int, string='bufferstr', errors='text_or_none',
+                 w_final=WrappedDefault(False))
+    def code_page_decode(space, code_page, string, errors="strict", 
w_final=None):
+        if errors is None:
+            errors = 'strict'
+        final = space.is_true(w_final)
+        state = space.fromcache(CodecState)
+        result, length, pos = unicodehelper.str_decode_code_page(code_page,
+                                   string, errors, final,
+                                   state.decode_error_handler)
+        # must return bytes, pos
+        return space.newtuple([space.newutf8(result, length), 
space.newint(pos)])
+
+
 # utf-8 functions are not regular, because we have to pass
 # "allow_surrogates=False"
 @unwrap_spec(errors='text_or_none')
diff --git a/pypy/module/_codecs/moduledef.py b/pypy/module/_codecs/moduledef.py
--- a/pypy/module/_codecs/moduledef.py
+++ b/pypy/module/_codecs/moduledef.py
@@ -1,5 +1,6 @@
 from pypy.interpreter.mixedmodule import MixedModule
 from rpython.rlib.objectmodel import not_rpython
+from rpython.rlib import rwin32
 from pypy.module._codecs import interp_codecs
 
 class Module(MixedModule):
@@ -87,9 +88,8 @@
 
     @not_rpython
     def __init__(self, space, *args):
-        # mbcs codec is Windows specific, and based on rffi.
-        from rpython.rlib import runicode
-        if (hasattr(runicode, 'str_decode_mbcs')):
+        # mbcs codec is Windows specific, and based on rffi system calls.
+        if rwin32.WIN32:
             self.interpleveldefs['mbcs_encode'] = 'interp_codecs.mbcs_encode'
             self.interpleveldefs['oem_encode'] = 'interp_codecs.oem_encode'
             self.interpleveldefs['code_page_encode'] = 
'interp_codecs.code_page_encode'
diff --git a/pypy/module/_codecs/test/test_codecs.py 
b/pypy/module/_codecs/test/test_codecs.py
--- a/pypy/module/_codecs/test/test_codecs.py
+++ b/pypy/module/_codecs/test/test_codecs.py
@@ -432,6 +432,100 @@
             assert((b'aaaa' + seq + b'bbbb').decode('utf-8', 'ignore') ==
                           'aaaa' + res + 'bbbb')
 
[email protected](sys.platform != 'win32', reason='win32-only')
+class AppTestCodePage:
+    spaceconfig = {
+    }
+
+    def test_code_pages(self):
+        import _codecs as codecs
+        def check_decode(cp, test):
+            raw, errors, expected  = test
+            if expected is not None:
+                try:
+                    decoded = codecs.code_page_decode(cp, raw, errors, True)
+                except UnicodeDecodeError as err:
+                    assert False, ('Unable to decode %a from "cp%s" with '
+                              'errors=%r: %s' % (raw, cp, errors, err))
+                assert decoded[0] == expected, ('%a.decode("cp%s", %r)=%a != 
%a'
+                    % (raw, cp, errors, decoded[0], expected))
+                assert decoded[1] >= 0
+                assert decoded[1] <= len(raw)
+            else:
+                raises(UnicodeDecodeError,
+                    codecs.code_page_decode, cp, raw, errors, True)
+
+        def check_encode(cp, test):
+            text, errors, expected = test
+            if expected is not None:
+                try:
+                    encoded = codecs.code_page_encode(cp, text, errors)
+                except UnicodeEncodeError as err:
+                    assert False, ('Unable to encode %a to "cp%s" with '
+                              'errors=%r: %s' % (text, cp, errors, err))
+                assert encoded[0] == expected, ('%a.encode("cp%s", %r)=%a != 
%a'
+                    % (text, cp, errors, encoded[0], expected))
+                assert encoded[1] == len(text)
+            else:
+                raises(UnicodeEncodeError,
+                    codecs.code_page_encode, cp, text, errors)
+
+        for test in (
+                (u'abc', 'strict', b'abc'),
+                (u'\uff44\u9a3e', 'strict', b'\x82\x84\xe9\x80'),
+                # test error handlers
+                (u'\xff', 'strict', None),
+                (u'[\xff]', 'ignore', b'[]'),
+                (u'[\xff]', 'replace', b'[y]'),
+                (u'[\u20ac]', 'replace', b'[?]'),
+                (u'[\xff]', 'backslashreplace', b'[\\xff]'),
+                (u'[\xff]', 'namereplace',
+                 b'[\\N{LATIN SMALL LETTER Y WITH DIAERESIS}]'),
+                (u'[\xff]', 'xmlcharrefreplace', b'[&#255;]'),
+                (u'\udcff', 'strict', None),
+                (u'[\udcff]', 'surrogateescape', b'[\xff]'),
+                (u'[\udcff]', 'surrogatepass', None),
+            ):
+            check_encode(932, test)
+
+        for test in (
+                (b'abc', 'strict', u'abc'),
+                (b'\x82\x84\xe9\x80', 'strict', u'\uff44\u9a3e'),
+                # invalid bytes
+                (b'[\xff]', 'strict', None),
+                (b'[\xff]', 'ignore', u'[]'),
+                (b'[\xff]', 'replace', u'[\ufffd]'),
+                (b'[\xff]', 'backslashreplace', u'[\\xff]'),
+                (b'[\xff]', 'surrogateescape', u'[\udcff]'),
+                (b'[\xff]', 'surrogatepass', None),
+                (b'\x81\x00abc', 'strict', None),
+                (b'\x81\x00abc', 'ignore', u'\x00abc'),
+                (b'\x81\x00abc', 'replace', u'\ufffd\x00abc'),
+                (b'\x81\x00abc', 'backslashreplace', u'\\x81\x00abc'),
+            ):
+            check_decode(932, test)
+
+        for test in (
+                (u'abc', 'strict', b'abc'),
+                (u'\xe9\u20ac', 'strict',  b'\xe9\x80'),
+                (u'\xff', 'strict', b'\xff'),
+                # test error handlers
+                (u'\u0141', 'strict', None),
+                (u'\u0141', 'ignore', b''),
+                (u'\u0141', 'replace', b'L'),
+                (u'\udc98', 'surrogateescape', b'\x98'),
+                (u'\udc98', 'surrogatepass', None),
+            ):
+            check_encode(1252, test)
+
+        for test in (
+                (b'abc', 'strict', u'abc'),
+                (b'\xe9\x80', 'strict', u'\xe9\u20ac'),
+                (b'\xff', 'strict', u'\xff'),
+            ):
+            check_decode(1252, test)
+
+
 
 class AppTestPartialEvaluation:
     spaceconfig = dict(usemodules=['array',])
diff --git a/rpython/rlib/rwin32.py b/rpython/rlib/rwin32.py
--- a/rpython/rlib/rwin32.py
+++ b/rpython/rlib/rwin32.py
@@ -114,6 +114,7 @@
                        WC_NO_BEST_FIT_CHARS STD_INPUT_HANDLE STD_OUTPUT_HANDLE
                        STD_ERROR_HANDLE HANDLE_FLAG_INHERIT FILE_TYPE_CHAR
                        LOAD_WITH_ALTERED_SEARCH_PATH
+                       CP_ACP CP_UTF8 CP_UTF7 CP_OEMCP MB_ERR_INVALID_CHARS
                     """
         from rpython.translator.platform import host_factory
         static_platform = host_factory()
_______________________________________________
pypy-commit mailing list
[email protected]
https://mail.python.org/mailman/listinfo/pypy-commit

Reply via email to