Author: Matti Picus <[email protected]>
Branch: code_page-utf8
Changeset: r98014:b27e7cddb1e0
Date: 2019-11-10 14:34 -0500
http://bitbucket.org/pypy/pypy/changeset/b27e7cddb1e0/
Log: test, implement code page encoding/decoding via a new
unicodehelper_win32.py
diff --git a/pypy/interpreter/unicodehelper.py
b/pypy/interpreter/unicodehelper.py
--- a/pypy/interpreter/unicodehelper.py
+++ b/pypy/interpreter/unicodehelper.py
@@ -61,11 +61,10 @@
state = space.fromcache(interp_codecs.CodecState)
errorhandler=state.decode_error_handler
if _WIN32:
+ import pypy.interpreter.unicodehelper_win32 as win32
bytes = space.bytes_w(w_string)
slen = len(bytes)
- uni, lgt = runicode.str_decode_mbcs(bytes, slen, 'strict', final=True,
- errorhandler=errorhandler, force_ignore=False)
- utf8 = uni.encode('utf-8')
+ utf8, _, lgt = str_decode_mbcs(bytes, 'strict', True, errorhandler)
elif 0 and _MACOSX:
bytes = space.bytes_w(w_string)
utf8, lgt, pos = str_decode_utf8(bytes, 'surrogateescape', True,
@@ -362,29 +361,31 @@
return result.build()
if _WIN32:
+ import pypy.interpreter.unicodehelper_win32 as win32
def utf8_encode_mbcs(s, errors, errorhandler, allow_surrogates=False):
- res = rutf8.utf8_encode_mbcs(s, errors, errorhandler,
- force_replace=False)
+ res = win32.utf8_encode_mbcs(s, errors, errorhandler)
return res
- def str_decode_mbcs(s, errors, final, errorhandler, force_ignore=True):
- slen = len(s)
- res, size = runicode.str_decode_mbcs(s, slen, errors, final=final,
- errorhandler=errorhandler,
force_ignore=force_ignore)
- res_utf8 = runicode.unicode_encode_utf_8(res, size, 'strict')
- return res_utf8, len(res), size
-
- def utf8_encode_code_page(s, errors, errorhandler, allow_surrogates=False):
- pass
-
- def str_decode_code_page(s, errors, final, errorhandler,
force_ignore=True):
- pass
+ def str_decode_mbcs(s, errors, final, errorhandler):
+ res, size = win32.str_decode_mbcs(s, errors, errorhandler, final=final)
+ return res, len(res), size
def utf8_encode_oem(s, errors, errorhandler, allow_surrogates=False):
- pass
+ res = win32.utf8_encode_oem(s, errors, errorhandler)
+ return res
- def str_decode_oem(s, errors, final, errorhandler, force_ignore=True):
- pass
+ def str_decode_oem(s, errors, final, errorhandler):
+ res, size = win32.str_decode_oem(s, errors, errorhandler, final)
+ return res, len(res), size
+
+ def utf8_encode_code_page(cp, s, errors, errorhandler,
allow_surrogates=False):
+ res = win32.utf8_encode_code_page(cp, s, errors, errorhandler)
+ return res
+
+ def str_decode_code_page(cp, s, errors, final, errorhandler):
+ res, size = win32.str_decode_code_page(cp, s, errors, errorhandler,
final)
+ return res, len(res), size
+
def str_decode_utf8(s, errors, final, errorhandler, allow_surrogates=False):
try:
diff --git a/pypy/interpreter/unicodehelper_win32.py
b/pypy/interpreter/unicodehelper_win32.py
new file mode 100644
--- /dev/null
+++ b/pypy/interpreter/unicodehelper_win32.py
@@ -0,0 +1,202 @@
+from rpython.rtyper.lltypesystem import lltype, rffi
+from rpython.rlib.runicode import (BOOLP, WideCharToMultiByte,
+ MultiByteToWideChar)
+from rpython.rlib.rutf8 import (Utf8StringBuilder, Utf8StringIterator,
+ next_codepoint_pos)
+from rpython.rlib import rwin32
+
+def Py_UNICODE_HIGH_SURROGATE(ch):
+ return rffi.cast(lltype.UniChar, 0xD800 - (0x10000 >> 10) + ((ch) >> 10))
+
+def Py_UNICODE_LOW_SURROGATE(ch):
+ return rffi.cast(lltype.uniChar, 0xDC00 + ((ch) & 0x3FF))
+
+if rffi.sizeof(rffi.INT) < rffi.sizeof(rffi.SIZE_T):
+ NEED_RETRY = True
+else:
+ NEED_RETRY = False
+WC_ERR_INVALID_CHARS = 0x0080
+
+code_page_map = {
+ rwin32.CP_ACP: "mbcs",
+ rwin32.CP_UTF7:"CP_UTF7",
+ rwin32.CP_UTF8:"CP_UTF8",
+ }
+
+def _code_page_name(code_page):
+ return code_page_map.get(code_page, "cp%d" % code_page)
+
+def _decode_code_page_flags(code_page):
+ if code_page == rwin32.CP_UTF7:
+ # The CP_UTF7 decoder only supports flags==0
+ return 0
+ return rwin32.MB_ERR_INVALID_CHARS
+
+def _encode_code_page_flags(code_page, errors):
+ if code_page == rwin32.CP_UTF8:
+ return WC_ERR_INVALID_CHARS
+ elif code_page == rwin32.CP_UTF7:
+ return 0
+ if errors == 'replace':
+ return 0
+ return rwin32.WC_NO_BEST_FIT_CHARS
+
+def _decode_cp_error(s, errorhandler, encoding, errors, start, end):
+ # late import to avoid circular import
+ from pypy.interpreter.unicodehelper import _str_decode_utf8_slowpath
+ if rwin32.GetLastError_saved() == rwin32.ERROR_NO_UNICODE_TRANSLATION:
+ msg = ("No mapping for the Unicode character exists in the target "
+ "multi-byte code page.")
+ r, ignore1, ignore2 = _str_decode_utf8_slowpath(s[start:end], errors,
False, errorhandler, False)
+ return r, end
+ else:
+ raise rwin32.lastSavedWindowsError()
+
+def _unibuf_to_utf8(uni, insize):
+ """Encode the widechar unicode buffer u to utf8
+ Should never error, since the buffer comes from a call to
+ MultiByteToWideChar
+ """
+ flags = 0
+ cp = rwin32.CP_UTF8
+ used_default_p = lltype.nullptr(BOOLP.TO)
+ assert uni is not None
+ with rffi.scoped_nonmoving_unicodebuffer(uni) as dataptr:
+ # first get the size of the result
+ outsize = WideCharToMultiByte(cp, flags, dataptr, insize,
+ None, 0, None, used_default_p)
+ if outsize == 0:
+ raise rwin32.lastSavedWindowsError()
+ with rffi.scoped_alloc_buffer(outsize) as buf:
+ # do the conversion
+ if WideCharToMultiByte(cp, flags, dataptr, insize, buf.raw,
+ outsize, None, used_default_p) == 0:
+ raise rwin32.lastSavedWindowsError()
+ result = buf.str(outsize)
+ assert result is not None
+ return result
+
+def _decode_helper(cp, s, flags, encoding, errors, errorhandler,
+ start, end, res):
+ if end > len(s):
+ end = len(s)
+ piece = s[start:end]
+ with rffi.scoped_nonmovingbuffer(piece) as dataptr:
+ # first get the size of the result
+ outsize = MultiByteToWideChar(cp, flags, dataptr, len(piece),
+ lltype.nullptr(rffi.CWCHARP.TO), 0)
+ if outsize == 0:
+ r, pos = _decode_cp_error(s, errorhandler,
+ encoding, errors, start, end)
+ res.append(r)
+ return pos
+
+ with rffi.scoped_alloc_unicodebuffer(outsize) as buf:
+ # do the conversion
+ if MultiByteToWideChar(cp, flags, dataptr, len(piece),
+ buf.raw, outsize) == 0:
+ r, pos = _decode_cp_error(s, errorhandler,
+ encoding, errors, start, end)
+ res.append(r)
+ return pos
+ else:
+ res.append(_unibuf_to_utf8(buf.str(outsize), outsize))
+ return end
+
+def str_decode_code_page(cp, s, errors, errorhandler, final=False):
+ """Decodes a byte string s from a code page cp with an error handler.
+ Returns utf8 result, original s length
+ """
+ insize = len(s)
+ if insize == 0:
+ return '', 0
+ flags = _decode_code_page_flags(cp)
+ encoding = _code_page_name(cp)
+ assert errorhandler is not None
+ res = Utf8StringBuilder(insize)
+ if errors == 'strict':
+ _decode_helper(cp, s, flags, encoding, errors, errorhandler,
+ 0, len(s), res)
+ else:
+ prev_pos = 0
+ pos = 0
+ while pos < len(s):
+ pos = next_codepoint_pos(s, prev_pos)
+ pos = _decode_helper(cp, s, flags, encoding,
+ errors, errorhandler, prev_pos, pos, res)
+ prev_pos = pos
+ return res.build(), insize
+
+def str_decode_mbcs(s, errors, errorhandler, final=False):
+ return str_decode_code_page(rwin32.CP_ACP, s, errors, errorhandler, final)
+
+def str_decode_oem(s, errors, errorhandler, final=False):
+ return str_decode_code_page(rwin32.CP_OEMCP, s, errors, errorhandler,
final)
+
+def utf8_encode_code_page(cp, s, errors, errorhandler):
+ """Encode a utf8 string s using code page cp and the given
+ errors/errorhandler.
+ Returns a encoded byte string
+ """
+
+ name = _code_page_name(cp)
+ lgt = len(s)
+
+ if lgt == 0:
+ return ''
+ flags = _encode_code_page_flags(cp, errors)
+ if cp in (rwin32.CP_UTF8, rwin32.CP_UTF7):
+ used_default_p = lltype.nullptr(BOOLP.TO)
+ else:
+ used_default_p = lltype.malloc(BOOLP.TO, 1, flavor='raw')
+ # Encode one codpoint at a time to allow the errorhandlers to do
+ # their thing
+ chars = lltype.malloc(rffi.CWCHARP.TO, 2, flavor = 'raw')
+ res = Utf8StringBuilder(lgt)
+ try:
+ pos = 0
+ for uni in Utf8StringIterator(s):
+ if used_default_p:
+ used_default_p[0] = rffi.cast(rwin32.BOOL, False)
+ if uni < 0x10000:
+ chars[0] = rffi.cast(lltype.UniChar, uni)
+ charsize = 1
+ else:
+ chars[0] = Py_UNICODE_HIGH_SURROGATE(uni)
+ chars[0] = Py_UNICODE_LOW_SURROGATE(uni)
+ charsize = 2
+ # first get the size of the result
+ outsize = WideCharToMultiByte(cp, flags, chars, charsize, None, 0,
+ None, used_default_p)
+ if outsize == 0:
+ raise rwin32.lastSavedWindowsError()
+ # If we used a default char, then we failed!
+ if (used_default_p and rffi.cast(lltype.Bool, used_default_p[0])):
+ r, pos, retype = errorhandler(errors, name, "invalid
character", s, pos, pos+1)
+ res.append(r)
+ continue
+ with rffi.scoped_alloc_buffer(outsize) as buf:
+ # do the conversion
+ if WideCharToMultiByte(cp, flags,
+ chars, charsize, buf.raw, outsize,
+ None, used_default_p) == 0:
+ raise rwin32.lastSavedWindowsError()
+ if (used_default_p and
+ rffi.cast(lltype.Bool, used_default_p[0])):
+ r, pos, rettype = errorhandler(errors, name, "invalid
character",
+ s, pos, pos + 1)
+ res.append(r)
+ else:
+ res.append(buf.str(outsize))
+ pos += 1
+ return res.build()
+ finally:
+ lltype.free(chars, flavor='raw')
+ if used_default_p:
+ lltype.free(used_default_p, flavor='raw')
+
+def utf8_encode_mbcs(s, errors, errorhandler):
+ return utf8_encode_code_page(rwin32.CP_ACP, s, errors, errorhandler)
+
+def utf8_encode_oem(s, errors, errorhandler):
+ return utf8_encode_code_page(rwin32.CP_OEMCP, s, errors, errorhandler)
diff --git a/pypy/module/_codecs/interp_codecs.py
b/pypy/module/_codecs/interp_codecs.py
--- a/pypy/module/_codecs/interp_codecs.py
+++ b/pypy/module/_codecs/interp_codecs.py
@@ -708,11 +708,41 @@
if getattr(unicodehelper, '_WIN32', False):
make_encoder_wrapper('mbcs_encode')
make_decoder_wrapper('mbcs_decode')
- make_encoder_wrapper('code_page_encode')
- make_decoder_wrapper('code_page_decode')
make_encoder_wrapper('oem_encode')
make_decoder_wrapper('oem_decode')
+ # need to add the code_page argument
+
+ @unwrap_spec(code_page=int, errors='text_or_none')
+ def code_page_encode(space, code_page, w_arg, errors="strict"):
+ # w_arg is a W_Unicode or W_Bytes?
+ w_arg = space.convert_arg_to_w_unicode(w_arg, errors)
+ if errors is None:
+ errors = 'strict'
+ allow_surrogates = False
+ if errors in ('surrogatepass',):
+ allow_surrogates = True
+ state = space.fromcache(CodecState)
+ ulen = w_arg._length
+ result = unicodehelper.utf8_encode_code_page(code_page, w_arg._utf8,
+ errors, state.encode_error_handler,
+ allow_surrogates=allow_surrogates)
+ return space.newtuple([space.newbytes(result), space.newint(ulen)])
+
+ @unwrap_spec(code_page=int, string='bufferstr', errors='text_or_none',
+ w_final=WrappedDefault(False))
+ def code_page_decode(space, code_page, string, errors="strict",
w_final=None):
+ if errors is None:
+ errors = 'strict'
+ final = space.is_true(w_final)
+ state = space.fromcache(CodecState)
+ result, length, pos = unicodehelper.str_decode_code_page(code_page,
+ string, errors, final,
+ state.decode_error_handler)
+ # must return bytes, pos
+ return space.newtuple([space.newutf8(result, length),
space.newint(pos)])
+
+
# utf-8 functions are not regular, because we have to pass
# "allow_surrogates=False"
@unwrap_spec(errors='text_or_none')
diff --git a/pypy/module/_codecs/moduledef.py b/pypy/module/_codecs/moduledef.py
--- a/pypy/module/_codecs/moduledef.py
+++ b/pypy/module/_codecs/moduledef.py
@@ -1,5 +1,6 @@
from pypy.interpreter.mixedmodule import MixedModule
from rpython.rlib.objectmodel import not_rpython
+from rpython.rlib import rwin32
from pypy.module._codecs import interp_codecs
class Module(MixedModule):
@@ -87,9 +88,8 @@
@not_rpython
def __init__(self, space, *args):
- # mbcs codec is Windows specific, and based on rffi.
- from rpython.rlib import runicode
- if (hasattr(runicode, 'str_decode_mbcs')):
+ # mbcs codec is Windows specific, and based on rffi system calls.
+ if rwin32.WIN32:
self.interpleveldefs['mbcs_encode'] = 'interp_codecs.mbcs_encode'
self.interpleveldefs['oem_encode'] = 'interp_codecs.oem_encode'
self.interpleveldefs['code_page_encode'] =
'interp_codecs.code_page_encode'
diff --git a/pypy/module/_codecs/test/test_codecs.py
b/pypy/module/_codecs/test/test_codecs.py
--- a/pypy/module/_codecs/test/test_codecs.py
+++ b/pypy/module/_codecs/test/test_codecs.py
@@ -432,6 +432,100 @@
assert((b'aaaa' + seq + b'bbbb').decode('utf-8', 'ignore') ==
'aaaa' + res + 'bbbb')
[email protected](sys.platform != 'win32', reason='win32-only')
+class AppTestCodePage:
+ spaceconfig = {
+ }
+
+ def test_code_pages(self):
+ import _codecs as codecs
+ def check_decode(cp, test):
+ raw, errors, expected = test
+ if expected is not None:
+ try:
+ decoded = codecs.code_page_decode(cp, raw, errors, True)
+ except UnicodeDecodeError as err:
+ assert False, ('Unable to decode %a from "cp%s" with '
+ 'errors=%r: %s' % (raw, cp, errors, err))
+ assert decoded[0] == expected, ('%a.decode("cp%s", %r)=%a !=
%a'
+ % (raw, cp, errors, decoded[0], expected))
+ assert decoded[1] >= 0
+ assert decoded[1] <= len(raw)
+ else:
+ raises(UnicodeDecodeError,
+ codecs.code_page_decode, cp, raw, errors, True)
+
+ def check_encode(cp, test):
+ text, errors, expected = test
+ if expected is not None:
+ try:
+ encoded = codecs.code_page_encode(cp, text, errors)
+ except UnicodeEncodeError as err:
+ assert False, ('Unable to encode %a to "cp%s" with '
+ 'errors=%r: %s' % (text, cp, errors, err))
+ assert encoded[0] == expected, ('%a.encode("cp%s", %r)=%a !=
%a'
+ % (text, cp, errors, encoded[0], expected))
+ assert encoded[1] == len(text)
+ else:
+ raises(UnicodeEncodeError,
+ codecs.code_page_encode, cp, text, errors)
+
+ for test in (
+ (u'abc', 'strict', b'abc'),
+ (u'\uff44\u9a3e', 'strict', b'\x82\x84\xe9\x80'),
+ # test error handlers
+ (u'\xff', 'strict', None),
+ (u'[\xff]', 'ignore', b'[]'),
+ (u'[\xff]', 'replace', b'[y]'),
+ (u'[\u20ac]', 'replace', b'[?]'),
+ (u'[\xff]', 'backslashreplace', b'[\\xff]'),
+ (u'[\xff]', 'namereplace',
+ b'[\\N{LATIN SMALL LETTER Y WITH DIAERESIS}]'),
+ (u'[\xff]', 'xmlcharrefreplace', b'[ÿ]'),
+ (u'\udcff', 'strict', None),
+ (u'[\udcff]', 'surrogateescape', b'[\xff]'),
+ (u'[\udcff]', 'surrogatepass', None),
+ ):
+ check_encode(932, test)
+
+ for test in (
+ (b'abc', 'strict', u'abc'),
+ (b'\x82\x84\xe9\x80', 'strict', u'\uff44\u9a3e'),
+ # invalid bytes
+ (b'[\xff]', 'strict', None),
+ (b'[\xff]', 'ignore', u'[]'),
+ (b'[\xff]', 'replace', u'[\ufffd]'),
+ (b'[\xff]', 'backslashreplace', u'[\\xff]'),
+ (b'[\xff]', 'surrogateescape', u'[\udcff]'),
+ (b'[\xff]', 'surrogatepass', None),
+ (b'\x81\x00abc', 'strict', None),
+ (b'\x81\x00abc', 'ignore', u'\x00abc'),
+ (b'\x81\x00abc', 'replace', u'\ufffd\x00abc'),
+ (b'\x81\x00abc', 'backslashreplace', u'\\x81\x00abc'),
+ ):
+ check_decode(932, test)
+
+ for test in (
+ (u'abc', 'strict', b'abc'),
+ (u'\xe9\u20ac', 'strict', b'\xe9\x80'),
+ (u'\xff', 'strict', b'\xff'),
+ # test error handlers
+ (u'\u0141', 'strict', None),
+ (u'\u0141', 'ignore', b''),
+ (u'\u0141', 'replace', b'L'),
+ (u'\udc98', 'surrogateescape', b'\x98'),
+ (u'\udc98', 'surrogatepass', None),
+ ):
+ check_encode(1252, test)
+
+ for test in (
+ (b'abc', 'strict', u'abc'),
+ (b'\xe9\x80', 'strict', u'\xe9\u20ac'),
+ (b'\xff', 'strict', u'\xff'),
+ ):
+ check_decode(1252, test)
+
+
class AppTestPartialEvaluation:
spaceconfig = dict(usemodules=['array',])
diff --git a/rpython/rlib/rwin32.py b/rpython/rlib/rwin32.py
--- a/rpython/rlib/rwin32.py
+++ b/rpython/rlib/rwin32.py
@@ -114,6 +114,7 @@
WC_NO_BEST_FIT_CHARS STD_INPUT_HANDLE STD_OUTPUT_HANDLE
STD_ERROR_HANDLE HANDLE_FLAG_INHERIT FILE_TYPE_CHAR
LOAD_WITH_ALTERED_SEARCH_PATH
+ CP_ACP CP_UTF8 CP_UTF7 CP_OEMCP MB_ERR_INVALID_CHARS
"""
from rpython.translator.platform import host_factory
static_platform = host_factory()
_______________________________________________
pypy-commit mailing list
[email protected]
https://mail.python.org/mailman/listinfo/pypy-commit