Author: Matti Picus <[email protected]> Branch: unicode-utf8-py3 Changeset: r94752:3f63d5b725cc Date: 2018-06-10 21:44 -0700 http://bitbucket.org/pypy/pypy/changeset/3f63d5b725cc/
Log: merge unicode-utf8 into branch, probably many mistakes in merge diff too long, truncating to 2000 out of 13167 lines diff --git a/TODO b/TODO new file mode 100644 --- /dev/null +++ b/TODO @@ -0,0 +1,6 @@ +* find a better way to run "find" without creating the index storage, + if one is not already readily available +* write the correct jit_elidable in _get_index_storage +* improve performance of splitlines +* fix _pypyjson to not use a wrapped dict when decoding an object +* make sure we review all the places that call ord(unichr) to check for ValueErrors diff --git a/pypy/doc/whatsnew-head.rst b/pypy/doc/whatsnew-head.rst --- a/pypy/doc/whatsnew-head.rst +++ b/pypy/doc/whatsnew-head.rst @@ -28,6 +28,10 @@ The reverse-debugger branch has been merged. For more information, see https://bitbucket.org/pypy/revdb +.. branch: unicode-utf8-re +.. branch: utf8-io + +Utf8 handling for unicode .. branch: pyparser-improvements-3 diff --git a/pypy/interpreter/astcompiler/astbuilder.py b/pypy/interpreter/astcompiler/astbuilder.py --- a/pypy/interpreter/astcompiler/astbuilder.py +++ b/pypy/interpreter/astcompiler/astbuilder.py @@ -58,6 +58,7 @@ self.space = space self.compile_info = compile_info self.root_node = n + # used in f-strings self.recursive_parser = recursive_parser def build_ast(self): diff --git a/pypy/interpreter/astcompiler/test/test_compiler.py b/pypy/interpreter/astcompiler/test/test_compiler.py --- a/pypy/interpreter/astcompiler/test/test_compiler.py +++ b/pypy/interpreter/astcompiler/test/test_compiler.py @@ -1264,9 +1264,6 @@ class AppTestCompiler: - def setup_class(cls): - cls.w_maxunicode = cls.space.wrap(sys.maxunicode) - def test_docstring_not_loaded(self): import io, dis, sys ns = {} diff --git a/pypy/interpreter/astcompiler/validate.py b/pypy/interpreter/astcompiler/validate.py --- a/pypy/interpreter/astcompiler/validate.py +++ b/pypy/interpreter/astcompiler/validate.py @@ -409,7 +409,7 @@ def visit_Str(self, node): space = self.space w_type = space.type(node.s) - if w_type != space.w_unicode: + if w_type != space.w_str: raise oefmt(space.w_TypeError, "non-string type in Str") def visit_Bytes(self, node): diff --git a/pypy/interpreter/baseobjspace.py b/pypy/interpreter/baseobjspace.py --- a/pypy/interpreter/baseobjspace.py +++ b/pypy/interpreter/baseobjspace.py @@ -3,7 +3,7 @@ from rpython.rlib.cache import Cache from rpython.tool.uid import HUGEVAL_BYTES -from rpython.rlib import jit, types +from rpython.rlib import jit, types, rutf8 from rpython.rlib.debug import make_sure_not_resized from rpython.rlib.objectmodel import (we_are_translated, newlist_hint, compute_unique_id, specialize, not_rpython) @@ -251,6 +251,12 @@ def text_w(self, space): self._typed_unwrap_error(space, "string") + def utf8_w(self, space): + self._typed_unwrap_error(space, "unicode") + + def convert_to_w_unicode(self, space): + self._typed_unwrap_error(space, "unicode") + def bytearray_list_of_chars_w(self, space): self._typed_unwrap_error(space, "bytearray") @@ -1066,7 +1072,7 @@ """ return None - def listview_unicode(self, w_list): + def listview_utf8(self, w_list): """ Return a list of unwrapped unicode out of a list of unicode. If the argument is not a list or does not contain only unicode, return None. May return None anyway. @@ -1096,8 +1102,15 @@ def newlist_bytes(self, list_s): return self.newlist([self.newbytes(s) for s in list_s]) - def newlist_unicode(self, list_u): - return self.newlist([self.newunicode(u) for u in list_u]) + def newlist_utf8(self, list_u, is_ascii): + l_w = [None] * len(list_u) + for i, item in enumerate(list_u): + if not is_ascii: + length = rutf8.check_utf8(item, True) + else: + length = len(item) + l_w[i] = self.newutf8(item, length) + return self.newlist(l_w) def newlist_int(self, list_i): return self.newlist([self.newint(i) for i in list_i]) @@ -1702,15 +1715,16 @@ assert w_obj is not None return w_obj.float_w(self, allow_conversion) - @specialize.argtype(1) - def unicode_w(self, w_obj): - assert w_obj is not None - return w_obj.unicode_w(self) + def utf8_w(self, w_obj): + return w_obj.utf8_w(self) + + def convert_to_w_unicode(self, w_obj): + return w_obj.convert_to_w_unicode(self) def unicode0_w(self, w_obj): "Like unicode_w, but rejects strings with NUL bytes." from rpython.rlib import rstring - result = w_obj.unicode_w(self) + result = w_obj.utf8_w(self).decode('utf8') if u'\x00' in result: raise oefmt(self.w_ValueError, "argument must be a unicode string without NUL " @@ -1733,6 +1747,23 @@ w_obj = self.fsencode(w_obj) return self.bytesbuf0_w(w_obj) + def convert_arg_to_w_unicode(self, w_obj, strict=None): + # XXX why convert_to_w_unicode does something slightly different? + from pypy.objspace.std.unicodeobject import W_UnicodeObject + assert not hasattr(self, 'is_fake_objspace') + return W_UnicodeObject.convert_arg_to_w_unicode(self, w_obj, strict) + + def utf8_len_w(self, w_obj): + w_obj = self.convert_arg_to_w_unicode(w_obj) + return w_obj._utf8, w_obj._len() + + def realutf8_w(self, w_obj): + # Like utf8_w(), but only works if w_obj is really of type + # 'unicode'. On Python 3 this is the same as utf8_w(). + if not self.isinstance_w(w_obj, self.w_unicode): + raise oefmt(self.w_TypeError, "argument must be a unicode") + return self.utf8_w(w_obj) + def bytesbuf0_w(self, w_obj): # Like bytes0_w(), but also accept a read-only buffer. from rpython.rlib import rstring @@ -2078,7 +2109,7 @@ 'float_w', 'uint_w', 'bigint_w', - 'unicode_w', + 'utf8_w', 'unwrap', 'is_true', 'is_w', diff --git a/pypy/interpreter/gateway.py b/pypy/interpreter/gateway.py --- a/pypy/interpreter/gateway.py +++ b/pypy/interpreter/gateway.py @@ -174,6 +174,9 @@ def visit_unicode(self, el, app_sig): self.checked_space_method(el, app_sig) + def visit_utf8(self, el, app_sig): + self.checked_space_method(el, app_sig) + def visit_fsencode(self, el, app_sig): self.checked_space_method(el, app_sig) @@ -326,6 +329,9 @@ def visit_unicode(self, typ): self.run_args.append("space.unicode_w(%s)" % (self.scopenext(),)) + def visit_utf8(self, typ): + self.run_args.append("space.utf8_w(%s)" % (self.scopenext(),)) + def visit_fsencode(self, typ): self.run_args.append("space.fsencode_w(%s)" % (self.scopenext(),)) @@ -497,6 +503,9 @@ def visit_text0(self, typ): self.unwrap.append("space.text0_w(%s)" % (self.nextarg(),)) + def visit_utf8(self, typ): + self.unwrap.append("space.utf8_w(%s)" % (self.nextarg(),)) + def visit_fsencode(self, typ): self.unwrap.append("space.fsencode_w(%s)" % (self.nextarg(),)) diff --git a/pypy/interpreter/pyparser/parsestring.py b/pypy/interpreter/pyparser/parsestring.py --- a/pypy/interpreter/pyparser/parsestring.py +++ b/pypy/interpreter/pyparser/parsestring.py @@ -1,4 +1,5 @@ # coding: utf-8 +from rpython.rlib import rutf8 from pypy.interpreter.baseobjspace import W_Root from pypy.interpreter.error import OperationError, oefmt from pypy.interpreter import unicodehelper @@ -91,9 +92,11 @@ if encoding is None: substr = s[ps:q] else: + unicodehelper.check_utf8_or_raise(space, s, ps, q) substr = decode_unicode_utf8(space, s, ps, q) - v = unicodehelper.decode_unicode_escape(space, substr) - return space.newunicode(v) + r = unicodehelper.decode_unicode_escape(space, substr) + v, length = r + return space.newutf8(v, length) assert 0 <= ps <= q substr = s[ps : q] @@ -135,15 +138,12 @@ # the backslash we just wrote, we emit "\u005c" # instead. lis.append("u005c") - if ord(s[ps]) & 0x80: # XXX inefficient - w, ps = decode_utf8(space, s, ps, end) - for c in w: - # The equivalent of %08x, which is not supported by RPython. - # 7 zeroes are enough for the unicode range, and the - # result still fits in 32-bit. - hexa = hex(ord(c) + 0x10000000) - lis.append('\\U0') - lis.append(hexa[3:]) # Skip 0x and the leading 1 + if ord(s[ps]) & 0x80: + cp = rutf8.codepoint_at_pos(s, ps) + hexa = hex(cp + 0x10000000) + lis.append('\\U0') + lis.append(hexa[3:]) # Skip 0x and the leading 1 + ps = rutf8.next_codepoint_pos(s, ps) else: lis.append(s[ps]) ps += 1 @@ -250,20 +250,29 @@ ch >= 'A' and ch <= 'F') -def decode_utf8(space, s, ps, end): +def check_utf8(space, s, ps, end): assert ps >= 0 pt = ps # while (s < end && *s != '\\') s++; */ /* inefficient for u".." while ps < end and ord(s[ps]) & 0x80: ps += 1 - u = unicodehelper.decode_utf8(space, s[pt:ps]) - return u, ps + try: + rutf8.check_utf8(s, True, pt, ps) + except rutf8.CheckError as e: + lgt, flag = rutf8.check_utf8(s, True, pt, e.pos) + unicodehelper.decode_error_handler(space)('strict', 'utf8', + 'invalid utf-8', s, pt + lgt, pt + lgt + 1) + return s[pt:ps] def decode_utf8_recode(space, s, ps, end, recode_encoding): - u, ps = decode_utf8(space, s, ps, end) - w_v = unicodehelper.encode(space, space.newunicode(u), recode_encoding) + p = ps + while p < end and ord(s[p]) & 0x80: + p += 1 + lgt = unicodehelper.check_utf8_or_raise(space, s, ps, p) + w_v = unicodehelper.encode(space, space.newutf8(s[ps:p], lgt), + recode_encoding) v = space.bytes_w(w_v) - return v, ps + return v, p def raise_app_valueerror(space, msg): raise OperationError(space.w_ValueError, space.newtext(msg)) diff --git a/pypy/interpreter/pyparser/test/test_parsestring.py b/pypy/interpreter/pyparser/test/test_parsestring.py --- a/pypy/interpreter/pyparser/test/test_parsestring.py +++ b/pypy/interpreter/pyparser/test/test_parsestring.py @@ -10,7 +10,7 @@ assert space.bytes_w(w_ret) == value elif isinstance(value, unicode): assert space.type(w_ret) == space.w_unicode - assert space.unicode_w(w_ret) == value + assert space.utf8_w(w_ret).decode('utf8') == value else: assert False @@ -61,7 +61,7 @@ s = "u'\x81'" s = s.decode("koi8-u").encode("utf8")[1:] w_ret = parsestring.parsestr(self.space, 'koi8-u', s) - ret = space.unwrap(w_ret) + ret = w_ret._utf8.decode('utf8') assert ret == eval("# -*- coding: koi8-u -*-\nu'\x81'") def test_unicode_pep414(self): @@ -131,7 +131,4 @@ def test_decode_unicode_utf8(self): buf = parsestring.decode_unicode_utf8(self.space, 'u"\xf0\x9f\x92\x8b"', 2, 6) - if sys.maxunicode == 65535: - assert buf == r"\U0000d83d\U0000dc8b" - else: - assert buf == r"\U0001f48b" + assert buf == r"\U0001f48b" diff --git a/pypy/interpreter/test/test_gateway.py b/pypy/interpreter/test/test_gateway.py --- a/pypy/interpreter/test/test_gateway.py +++ b/pypy/interpreter/test/test_gateway.py @@ -555,25 +555,32 @@ w_app_g3_r = space.wrap(app_g3_r) space.raises_w(space.w_TypeError, space.call_function,w_app_g3_r,w(1.0)) - def test_interp2app_unwrap_spec_unicode(self): + def test_interp2app_unwrap_spec_utf8(self): space = self.space w = space.wrap - def g3_u(space, uni): - return space.wrap(len(uni)) + def g3_u(space, utf8): + return space.wrap(utf8) app_g3_u = gateway.interp2app_temp(g3_u, unwrap_spec=[gateway.ObjSpace, - unicode]) + 'utf8']) w_app_g3_u = space.wrap(app_g3_u) + encoded = u"gęść".encode('utf8') assert self.space.eq_w( - space.call_function(w_app_g3_u, w(u"foo")), - w(3)) + space.call_function(w_app_g3_u, w(u"gęść")), + w(encoded)) assert self.space.eq_w( - space.call_function(w_app_g3_u, w("baz")), - w(3)) + space.call_function(w_app_g3_u, w("foo")), + w("foo")) space.raises_w(space.w_TypeError, space.call_function, w_app_g3_u, w(None)) space.raises_w(space.w_TypeError, space.call_function, w_app_g3_u, w(42)) + w_ascii = space.appexec([], """(): + import sys + return sys.getdefaultencoding() == 'ascii'""") + if space.is_true(w_ascii): + raises(gateway.OperationError, space.call_function, w_app_g3_u, + w("\x80")) def test_interp2app_unwrap_spec_unwrapper(self): space = self.space diff --git a/pypy/interpreter/test/test_objspace.py b/pypy/interpreter/test/test_objspace.py --- a/pypy/interpreter/test/test_objspace.py +++ b/pypy/interpreter/test/test_objspace.py @@ -210,9 +210,7 @@ space = self.space w = space.wrap assert space.text0_w(w("123")) == "123" - exc = space.raises_w(space.w_ValueError, space.text0_w, w("123\x004")) - assert space.unicode0_w(w(u"123")) == u"123" - exc = space.raises_w(space.w_ValueError, space.unicode0_w, w(u"123\x004")) + space.raises_w(space.w_ValueError, space.text0_w, w("123\x004")) def test_text_w(self): space = self.space diff --git a/pypy/interpreter/test/test_unicodehelper.py b/pypy/interpreter/test/test_unicodehelper.py --- a/pypy/interpreter/test/test_unicodehelper.py +++ b/pypy/interpreter/test/test_unicodehelper.py @@ -1,5 +1,6 @@ import py import pytest +from hypothesis import given, strategies import struct import sys from pypy.interpreter.unicodehelper import ( @@ -10,23 +11,13 @@ class Hit(Exception): pass -class FakeSpace: - def __getattr__(self, name): - if name in ('w_UnicodeEncodeError', 'w_UnicodeDecodeError'): - raise Hit - raise AttributeError(name) +from pypy.interpreter.unicodehelper import str_decode_utf8 +from pypy.interpreter.unicodehelper import utf8_encode_ascii, str_decode_ascii +from pypy.interpreter import unicodehelper as uh +from pypy.module._codecs.interp_codecs import CodecState - -def test_encode_utf8(): - space = FakeSpace() - assert encode_utf8(space, u"abc") == "abc" - assert encode_utf8(space, u"\u1234") == "\xe1\x88\xb4" - py.test.raises(Hit, encode_utf8, space, u"\ud800") - py.test.raises(Hit, encode_utf8, space, u"\udc00") - # for the following test, go to lengths to avoid CPython's optimizer - # and .pyc file storage, which collapse the two surrogates into one - c = u"\udc00" - py.test.raises(Hit, encode_utf8, space, u"\ud800" + c) +def decode_utf8(u): + return str_decode_utf8(u, True, "strict", None) def test_encode_utf8_allow_surrogates(): sp = FakeSpace() @@ -45,18 +36,33 @@ assert got == "\xed\xa0\x80\xed\xb0\x80" def test_decode_utf8(): - space = FakeSpace() - assert decode_utf8(space, "abc") == u"abc" - assert decode_utf8(space, "\xe1\x88\xb4") == u"\u1234" - py.test.raises(Hit, decode_utf8, space, "\xed\xa0\x80") - py.test.raises(Hit, decode_utf8, space, "\xed\xb0\x80") - py.test.raises(Hit, decode_utf8, space, "\xed\xa0\x80\xed\xb0\x80") - got = decode_utf8(space, "\xf0\x90\x80\x80") + assert decode_utf8("abc") == ("abc", 3, 3) + assert decode_utf8("\xe1\x88\xb4") == ("\xe1\x88\xb4", 3, 1) + assert decode_utf8("\xed\xa0\x80") == ("\xed\xa0\x80", 3, 1) + py.test.raises(Hit, decode_utf8, "\xed\xa0\x80") + py.test.raises(Hit, decode_utf8, "\xed\xb0\x80") + py.test.raises(Hit, decode_utf8, "\xed\xa0\x80\xed\xb0\x80") + got = decode_utf8("\xf0\x90\x80\x80") if sys.maxunicode > 65535: assert map(ord, got) == [0x10000] else: assert map(ord, got) == [55296, 56320] +def test_utf8_encode_ascii(): + assert utf8_encode_ascii("abc", "??", "??") == "abc" + def eh(errors, encoding, reason, p, start, end): + lst.append((errors, encoding, p, start, end)) + return "<FOO>", end + lst = [] + input = u"\u1234".encode("utf8") + assert utf8_encode_ascii(input, "??", eh) == "<FOO>" + assert lst == [("??", "ascii", input, 0, 1)] + lst = [] + input = u"\u1234\u5678abc\u8765\u4321".encode("utf8") + assert utf8_encode_ascii(input, "??", eh) == "<FOO>abc<FOO>" + assert lst == [("??", "ascii", input, 0, 2), + ("??", "ascii", input, 5, 7)] + def test_decode_utf8_allow_surrogates(): sp = FakeSpace() assert decode_utf8(sp, "\xed\xa0\x80", allow_surrogates=True) == u"\ud800" @@ -90,10 +96,58 @@ return unicode_encode_utf_32_be( u"<%s>" % unich, 3, None, errorhandler, allow_surrogates=False) - assert replace_with(u'rep', None) == u'<rep>'.encode('utf-32-be') assert (replace_with(None, '\xca\xfe\xca\xfe') == '\x00\x00\x00<\xca\xfe\xca\xfe\x00\x00\x00>') with pytest.raises(UnicodeDecodeError): str_decode_utf_32_be(b"\x00\x00\xdc\x80", 4, None) + + +@given(strategies.text()) +def test_utf8_encode_ascii_2(u): + def eh(errors, encoding, reason, p, start, end): + return "?" * (end - start), end + assert utf8_encode_ascii(u.encode("utf8"), "replace", eh) == u.encode("ascii", "replace") + +def test_str_decode_ascii(): + assert str_decode_ascii("abc", "??", True, "??") == ("abc", 3, 3) + def eh(errors, encoding, reason, p, start, end): + lst.append((errors, encoding, p, start, end)) + return u"\u1234\u5678".encode("utf8"), end + lst = [] + input = "\xe8" + exp = u"\u1234\u5678".encode("utf8") + assert str_decode_ascii(input, "??", True, eh) == (exp, 1, 2) + assert lst == [("??", "ascii", input, 0, 1)] + lst = [] + input = "\xe8\xe9abc\xea\xeb" + assert str_decode_ascii(input, "??", True, eh) == ( + exp + exp + "abc" + exp + exp, 7, 11) + assert lst == [("??", "ascii", input, 0, 1), + ("??", "ascii", input, 1, 2), + ("??", "ascii", input, 5, 6), + ("??", "ascii", input, 6, 7)] + +@given(strategies.text()) +def test_unicode_raw_escape(u): + r = uh.utf8_encode_raw_unicode_escape(u.encode("utf8"), 'strict', None) + assert r == u.encode("raw-unicode-escape") + +@given(strategies.text()) +def test_unicode_escape(u): + r = uh.utf8_encode_unicode_escape(u.encode("utf8"), "strict", None) + assert r == u.encode("unicode-escape") + +def test_encode_decimal(space): + assert uh.unicode_encode_decimal(u' 12, 34 ', None) == ' 12, 34 ' + with pytest.raises(ValueError): + uh.unicode_encode_decimal(u' 12, \u1234 '.encode('utf8'), None) + state = space.fromcache(CodecState) + handler = state.encode_error_handler + assert uh.unicode_encode_decimal( + u'u\u1234\u1235v'.encode('utf8'), 'replace', handler) == 'u??v' + + result = uh.unicode_encode_decimal( + u'12\u1234'.encode('utf8'), 'xmlcharrefreplace', handler) + assert result == '12ሴ' diff --git a/pypy/interpreter/unicodehelper.py b/pypy/interpreter/unicodehelper.py --- a/pypy/interpreter/unicodehelper.py +++ b/pypy/interpreter/unicodehelper.py @@ -1,12 +1,12 @@ import sys + from pypy.interpreter.error import OperationError, oefmt from rpython.rlib.objectmodel import specialize -from rpython.rlib.rarithmetic import intmask -from rpython.rlib.rstring import StringBuilder, UnicodeBuilder -from rpython.rlib import runicode -from rpython.rlib.runicode import ( - default_unicode_error_encode, default_unicode_error_decode, - MAXUNICODE, BYTEORDER, BYTEORDER2, UNICHR) +from rpython.rlib.rstring import StringBuilder +from rpython.rlib import rutf8 +from rpython.rlib.rarithmetic import r_uint, intmask +from rpython.rtyper.lltypesystem import rffi +from pypy.module.unicodedata import unicodedb _WIN32 = sys.platform == 'win32' _MACOSX = sys.platform == 'darwin' @@ -32,16 +32,30 @@ @specialize.memo() def encode_error_handler(space): # Fast version of the "strict" errors handler. - def raise_unicode_exception_encode(errors, encoding, msg, u, + def raise_unicode_exception_encode(errors, encoding, msg, utf8, startingpos, endingpos): + u_len = rutf8.get_utf8_length(utf8) raise OperationError(space.w_UnicodeEncodeError, space.newtuple([space.newtext(encoding), - space.newunicode(u), + space.newutf8(utf8, u_len), space.newint(startingpos), space.newint(endingpos), space.newtext(msg)])) return raise_unicode_exception_encode +def default_error_encode( + errors, encoding, msg, u, startingpos, endingpos): + """A default handler, for tests""" + assert endingpos >= 0 + if errors == 'replace': + return '?', endingpos + if errors == 'ignore': + return '', endingpos + raise ValueError + +def convert_arg_to_w_unicode(space, w_arg, strict=None): + return space.convert_arg_to_w_unicode(w_arg) + # ____________________________________________________________ def fsdecode(space, w_string): @@ -112,27 +126,42 @@ from pypy.objspace.std.unicodeobject import encode_object return encode_object(space, w_data, encoding, errors) -# These functions take and return unwrapped rpython strings and unicodes + +def _has_surrogate(u): + for c in u: + if 0xD800 <= ord(c) <= 0xDFFF: + return True + return False + +# These functions take and return unwrapped rpython strings def decode_unicode_escape(space, string): from pypy.module._codecs import interp_codecs state = space.fromcache(interp_codecs.CodecState) unicodedata_handler = state.get_unicodedata_handler(space) - result, consumed = runicode.str_decode_unicode_escape( - string, len(string), "strict", - final=True, errorhandler=decode_error_handler(space), - unicodedata_handler=unicodedata_handler) - return result + result_utf8, consumed, length = str_decode_unicode_escape( + string, "strict", + final=True, + errorhandler=decode_error_handler(space), + ud_handler=unicodedata_handler) + return result_utf8, length def decode_raw_unicode_escape(space, string): - result, consumed = runicode.str_decode_raw_unicode_escape( - string, len(string), "strict", + result_utf8, consumed, lgt = str_decode_raw_unicode_escape( + string, "strict", final=True, errorhandler=decode_error_handler(space)) - return result + return result_utf8, lgt -def decode_utf8(space, string, allow_surrogates=False): - # Note that Python3 tends to forbid *all* surrogates in utf-8. - # If allow_surrogates=True, then revert to the Python 2 behavior, - # i.e. surrogates are accepted and not treated specially at all. +def check_ascii_or_raise(space, string): + try: + rutf8.check_ascii(string) + except rutf8.CheckError as e: + decode_error_handler(space)('strict', 'ascii', + 'ordinal not in range(128)', string, + e.pos, e.pos + 1) + assert False, "unreachable" + +def check_utf8_or_raise(space, string, start=0, end=-1): + # Surrogates are accepted and not treated specially at all. # If there happen to be two 3-bytes encoding a pair of surrogates, # you still get two surrogate unicode characters in the result. assert isinstance(string, str) @@ -142,61 +171,832 @@ allow_surrogates=allow_surrogates) return result -def encode_utf8(space, uni, allow_surrogates=False): - # Note that Python3 tends to forbid *all* surrogates in utf-8. - # If allow_surrogates=True, then revert to the Python 2 behavior - # which never raises UnicodeEncodeError. Surrogate pairs are then - # allowed, either paired or lone. A paired surrogate is considered - # like the non-BMP character it stands for. See also *_utf8sp(). - assert isinstance(uni, unicode) - return runicode.unicode_encode_utf_8( - uni, len(uni), "strict", - errorhandler=encode_error_handler(space), - allow_surrogates=allow_surrogates) +def str_decode_ascii(s, errors, final, errorhandler): + try: + rutf8.check_ascii(s) + return s, len(s), len(s) + except rutf8.CheckError: + return _str_decode_ascii_slowpath(s, errors, final, errorhandler) -def encode_utf8sp(space, uni): - # Surrogate-preserving utf-8 encoding. Any surrogate character - # turns into its 3-bytes encoding, whether it is paired or not. - # This should always be reversible, and the reverse is - # decode_utf8sp(). - return runicode.unicode_encode_utf8sp(uni, len(uni)) +def _str_decode_ascii_slowpath(s, errors, final, errorhandler): + i = 0 + res = StringBuilder() + while i < len(s): + ch = s[i] + if ord(ch) > 0x7F: + r, i = errorhandler(errors, 'ascii', 'ordinal not in range(128)', + s, i, i + 1) + res.append(r) + else: + res.append(ch) + i += 1 + ress = res.build() + lgt = rutf8.check_utf8(ress, True) + return ress, len(s), lgt -def decode_utf8sp(space, string): - # Surrogate-preserving utf-8 decoding. Assuming there is no - # encoding error, it should always be reversible, and the reverse is - # encode_utf8sp(). - return decode_utf8(space, string, allow_surrogates=True) +def str_decode_latin_1(s, errors, final, errorhandler): + try: + rutf8.check_ascii(s) + return s, len(s), len(s) + except rutf8.CheckError: + return _str_decode_latin_1_slowpath(s, errors, final, errorhandler) + +def _str_decode_latin_1_slowpath(s, errors, final, errorhandler): + res = StringBuilder(len(s)) + i = 0 + while i < len(s): + if ord(s[i]) > 0x7F: + while i < len(s) and ord(s[i]) > 0x7F: + rutf8.unichr_as_utf8_append(res, ord(s[i])) + i += 1 + else: + start = i + end = i + 1 + while end < len(s) and ord(s[end]) <= 0x7F: + end += 1 + res.append_slice(s, start, end) + i = end + # cannot be ASCII, cannot have surrogates, I believe + return res.build(), len(s), len(s) + +def utf8_encode_latin_1(s, errors, errorhandler): + try: + rutf8.check_ascii(s) + return s + except rutf8.CheckError: + return _utf8_encode_latin_1_slowpath(s, errors, errorhandler) + +def _utf8_encode_latin_1_slowpath(s, errors, errorhandler): + size = len(s) + result = StringBuilder(size) + index = 0 + pos = 0 + while pos < size: + ch = rutf8.codepoint_at_pos(s, pos) + if ch <= 0xFF: + result.append(chr(ch)) + index += 1 + pos = rutf8.next_codepoint_pos(s, pos) + else: + startindex = index + pos = rutf8.next_codepoint_pos(s, pos) + index += 1 + while pos < size and rutf8.codepoint_at_pos(s, pos) > 0xFF: + pos = rutf8.next_codepoint_pos(s, pos) + index += 1 + msg = "ordinal not in range(256)" + res_8, newindex = errorhandler( + errors, 'latin1', msg, s, startindex, index) + for cp in rutf8.Utf8StringIterator(res_8): + if cp > 0xFF: + errorhandler("strict", 'latin1', msg, s, startindex, index) + result.append(chr(cp)) + if index != newindex: # Should be uncommon + index = newindex + pos = rutf8._pos_at_index(s, newindex) + return result.build() + +def utf8_encode_ascii(s, errors, errorhandler): + """ Don't be confused - this is a slowpath for errors e.g. "ignore" + or an obscure errorhandler + """ + size = len(s) + result = StringBuilder(size) + index = 0 + pos = 0 + while pos < size: + ch = rutf8.codepoint_at_pos(s, pos) + if ch <= 0x7F: + result.append(chr(ch)) + index += 1 + pos = rutf8.next_codepoint_pos(s, pos) + else: + startindex = index + pos = rutf8.next_codepoint_pos(s, pos) + index += 1 + while pos < size and rutf8.codepoint_at_pos(s, pos) > 0x7F: + pos = rutf8.next_codepoint_pos(s, pos) + index += 1 + msg = "ordinal not in range(128)" + res_8, newindex = errorhandler( + errors, 'ascii', msg, s, startindex, index) + for cp in rutf8.Utf8StringIterator(res_8): + if cp > 0x7F: + errorhandler("strict", 'ascii', msg, s, startindex, index) + result.append(chr(cp)) + if index != newindex: # Should be uncommon + index = newindex + pos = rutf8._pos_at_index(s, newindex) + return result.build() + +if sys.platform == 'win32': + def utf8_encode_mbcs(s, errors, errorhandler): + from rpython.rlib import runicode + s = s.decode('utf-8') + slen = len(s) + res = runicode.unicode_encode_mbcs(s, slen, errors, errorhandler) + return res + + def str_decode_mbcs(s, errors, final, errorhandler): + from rpython.rlib import runicode + slen = len(s) + res, size = runicode.str_decode_mbcs(s, slen, final=final, errors=errors, + errorhandler=errorhandler) + return res.encode('utf8'), size, len(res) + +def str_decode_utf8(s, errors, final, errorhandler): + """ Same as checking for the valid utf8, but we know the utf8 is not + valid so we're trying to either raise or pack stuff with error handler. + The key difference is that this is call_may_force + """ + slen = len(s) + res = StringBuilder(slen) + pos = 0 + end = len(s) + while pos < end: + ordch1 = ord(s[pos]) + # fast path for ASCII + if ordch1 <= 0x7F: + pos += 1 + res.append(chr(ordch1)) + continue + + if ordch1 <= 0xC1: + r, pos = errorhandler(errors, "utf8", "invalid start byte", + s, pos, pos + 1) + res.append(r) + continue + + pos += 1 + + if ordch1 <= 0xDF: + if pos >= end: + if not final: + pos -= 1 + break + r, pos = errorhandler(errors, "utf8", "unexpected end of data", + s, pos - 1, pos) + res.append(r) + continue + ordch2 = ord(s[pos]) + + if rutf8._invalid_byte_2_of_2(ordch2): + r, pos = errorhandler(errors, "utf8", "invalid continuation byte", + s, pos - 1, pos) + res.append(r) + continue + # 110yyyyy 10zzzzzz -> 00000000 00000yyy yyzzzzzz + pos += 1 + res.append(chr(ordch1)) + res.append(chr(ordch2)) + continue + + if ordch1 <= 0xEF: + if (pos + 2) > end: + if not final: + pos -= 1 + break + r, pos = errorhandler(errors, "utf8", "unexpected end of data", + s, pos - 1, pos + 1) + res.append(r) + continue + ordch2 = ord(s[pos]) + ordch3 = ord(s[pos + 1]) + + if rutf8._invalid_byte_2_of_3(ordch1, ordch2, True): + r, pos = errorhandler(errors, "utf8", "invalid continuation byte", + s, pos - 1, pos) + res.append(r) + continue + elif rutf8._invalid_byte_3_of_3(ordch3): + r, pos = errorhandler(errors, "utf8", "invalid continuation byte", + s, pos - 1, pos + 1) + res.append(r) + continue + pos += 2 + + # 1110xxxx 10yyyyyy 10zzzzzz -> 00000000 xxxxyyyy yyzzzzzz + res.append(chr(ordch1)) + res.append(chr(ordch2)) + res.append(chr(ordch3)) + continue + + if ordch1 <= 0xF4: + if (pos + 3) > end: + if not final: + pos -= 1 + break + r, pos = errorhandler(errors, "utf8", "unexpected end of data", + s, pos - 1, pos) + res.append(r) + continue + ordch2 = ord(s[pos]) + ordch3 = ord(s[pos + 1]) + ordch4 = ord(s[pos + 2]) + + if rutf8._invalid_byte_2_of_4(ordch1, ordch2): + r, pos = errorhandler(errors, "utf8", "invalid continuation byte", + s, pos - 1, pos) + res.append(r) + continue + elif rutf8._invalid_byte_3_of_4(ordch3): + r, pos = errorhandler(errors, "utf8", "invalid continuation byte", + s, pos - 1, pos + 1) + res.append(r) + continue + elif rutf8._invalid_byte_4_of_4(ordch4): + r, pos = errorhandler(errors, "utf8", "invalid continuation byte", + s, pos - 1, pos + 2) + res.append(r) + continue + + pos += 3 + # 11110www 10xxxxxx 10yyyyyy 10zzzzzz -> 000wwwxx xxxxyyyy yyzzzzzz + res.append(chr(ordch1)) + res.append(chr(ordch2)) + res.append(chr(ordch3)) + res.append(chr(ordch4)) + continue + + r, pos = errorhandler(errors, "utf8", "invalid start byte", + s, pos - 1, pos) + res.append(r) + + r = res.build() + return r, pos, rutf8.check_utf8(r, True) + +hexdigits = "0123456789ABCDEFabcdef" + +def hexescape(builder, s, pos, digits, + encoding, errorhandler, message, errors): + chr = 0 + if pos + digits > len(s): + endinpos = pos + while endinpos < len(s) and s[endinpos] in hexdigits: + endinpos += 1 + res, pos = errorhandler( + errors, encoding, message, s, pos - 2, endinpos) + builder.append(res) + else: + try: + chr = int(s[pos:pos + digits], 16) + except ValueError: + endinpos = pos + while s[endinpos] in hexdigits: + endinpos += 1 + res, pos = errorhandler( + errors, encoding, message, s, pos - 2, endinpos) + builder.append(res) + else: + # when we get here, chr is a 32-bit unicode character + try: + builder.append_code(chr) + pos += digits + except ValueError: + message = "illegal Unicode character" + res, pos = errorhandler( + errors, encoding, message, s, pos - 2, pos + digits) + builder.append(res) + return pos + +def str_decode_unicode_escape(s, errors, final, errorhandler, ud_handler): + size = len(s) + if size == 0: + return '', 0, 0 + + builder = rutf8.Utf8StringBuilder(size) + pos = 0 + while pos < size: + ch = s[pos] + + # Non-escape characters are interpreted as Unicode ordinals + if ch != '\\': + if ord(ch) > 0x7F: + builder.append_code(ord(ch)) + else: + builder.append(ch) + pos += 1 + continue + + # - Escapes + pos += 1 + if pos >= size: + message = "\\ at end of string" + res, pos = errorhandler(errors, "unicodeescape", + message, s, pos - 1, size) + builder.append(res) + continue + + ch = s[pos] + pos += 1 + # \x escapes + if ch == '\n': + pass + elif ch == '\\': + builder.append_char('\\') + elif ch == '\'': + builder.append_char('\'') + elif ch == '\"': + builder.append_char('\"') + elif ch == 'b': + builder.append_char('\b') + elif ch == 'f': + builder.append_char('\f') + elif ch == 't': + builder.append_char('\t') + elif ch == 'n': + builder.append_char('\n') + elif ch == 'r': + builder.append_char('\r') + elif ch == 'v': + builder.append_char('\v') + elif ch == 'a': + builder.append_char('\a') + elif '0' <= ch <= '7': + x = ord(ch) - ord('0') + if pos < size: + ch = s[pos] + if '0' <= ch <= '7': + pos += 1 + x = (x << 3) + ord(ch) - ord('0') + if pos < size: + ch = s[pos] + if '0' <= ch <= '7': + pos += 1 + x = (x << 3) + ord(ch) - ord('0') + if x > 0x7F: + builder.append_code(x) + else: + builder.append_char(chr(x)) + # hex escapes + # \xXX + elif ch == 'x': + digits = 2 + message = "truncated \\xXX escape" + pos = hexescape(builder, s, pos, digits, + "unicodeescape", errorhandler, message, errors) + # \uXXXX + elif ch == 'u': + digits = 4 + message = "truncated \\uXXXX escape" + pos = hexescape(builder, s, pos, digits, + "unicodeescape", errorhandler, message, errors) + # \UXXXXXXXX + elif ch == 'U': + digits = 8 + message = "truncated \\UXXXXXXXX escape" + pos = hexescape(builder, s, pos, digits, + "unicodeescape", errorhandler, message, errors) + # \N{name} + elif ch == 'N' and ud_handler is not None: + message = "malformed \\N character escape" + look = pos + + if look < size and s[look] == '{': + # look for the closing brace + while look < size and s[look] != '}': + look += 1 + if look < size and s[look] == '}': + # found a name. look it up in the unicode database + message = "unknown Unicode character name" + name = s[pos + 1:look] + code = ud_handler.call(name) + if code < 0: + res, pos = errorhandler( + errors, "unicodeescape", message, + s, pos - 1, look + 1) + builder.append(res) + continue + pos = look + 1 + builder.append_code(code) + else: + res, pos = errorhandler(errors, "unicodeescape", + message, s, pos - 1, look + 1) + builder.append(res) + else: + res, pos = errorhandler(errors, "unicodeescape", + message, s, pos - 1, look + 1) + builder.append(res) + else: + builder.append_char('\\') + builder.append_code(ord(ch)) + + return builder.build(), pos, builder.getlength() + +def wcharpsize2utf8(space, wcharp, size): + """Safe version of rffi.wcharpsize2utf8. + + Raises app-level ValueError if any wchar value is outside the valid + codepoint range. + """ + try: + return rffi.wcharpsize2utf8(wcharp, size) + except ValueError: + raise oefmt(space.w_ValueError, + "character is not in range [U+0000; U+10ffff]") + + +# ____________________________________________________________ +# Raw unicode escape + +def str_decode_raw_unicode_escape(s, errors, final=False, + errorhandler=None): + size = len(s) + if size == 0: + return '', 0, 0 + + builder = rutf8.Utf8StringBuilder(size) + pos = 0 + while pos < size: + ch = s[pos] + + # Non-escape characters are interpreted as Unicode ordinals + if ch != '\\': + builder.append_code(ord(ch)) + pos += 1 + continue + + # \u-escapes are only interpreted iff the number of leading + # backslashes is odd + bs = pos + while pos < size: + pos += 1 + if pos == size or s[pos] != '\\': + break + builder.append_char('\\') + + # we have a backslash at the end of the string, stop here + if pos >= size: + builder.append_char('\\') + break + + if ((pos - bs) & 1 == 0 or pos >= size or + (s[pos] != 'u' and s[pos] != 'U')): + builder.append_char('\\') + builder.append_code(ord(s[pos])) + pos += 1 + continue + + digits = 4 if s[pos] == 'u' else 8 + message = "truncated \\uXXXX" + pos += 1 + pos = hexescape(builder, s, pos, digits, + "rawunicodeescape", errorhandler, message, errors) + + return builder.build(), pos, builder.getlength() + +_utf8_encode_unicode_escape = rutf8.make_utf8_escape_function() + + +TABLE = '0123456789abcdef' + +def raw_unicode_escape_helper(result, char): + if char >= 0x10000 or char < 0: + result.append("\\U") + zeros = 8 + elif char >= 0x100: + result.append("\\u") + zeros = 4 + else: + result.append("\\x") + zeros = 2 + for i in range(zeros-1, -1, -1): + result.append(TABLE[(char >> (4 * i)) & 0x0f]) + +def utf8_encode_raw_unicode_escape(s, errors, errorhandler): + # errorhandler is not used: this function cannot cause Unicode errors + size = len(s) + if size == 0: + return '' + result = StringBuilder(size) + pos = 0 + while pos < size: + oc = rutf8.codepoint_at_pos(s, pos) + + if oc < 0x100: + result.append(chr(oc)) + else: + raw_unicode_escape_helper(result, oc) + pos = rutf8.next_codepoint_pos(s, pos) + + return result.build() + + +def utf8_encode_unicode_escape(s, errors, errorhandler): + return _utf8_encode_unicode_escape(s) + +# ____________________________________________________________ +# utf-7 + +# Three simple macros defining base-64 + +def _utf7_IS_BASE64(oc): + "Is c a base-64 character?" + c = chr(oc) + return c.isalnum() or c == '+' or c == '/' +def _utf7_TO_BASE64(n): + "Returns the base-64 character of the bottom 6 bits of n" + return "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/"[n & 0x3f] +def _utf7_FROM_BASE64(c): + "given that c is a base-64 character, what is its base-64 value?" + if c >= 'a': + return ord(c) - 71 + elif c >= 'A': + return ord(c) - 65 + elif c >= '0': + return ord(c) + 4 + elif c == '+': + return 62 + else: # c == '/' + return 63 + +def _utf7_DECODE_DIRECT(oc): + return oc <= 127 and oc != ord('+') + +# The UTF-7 encoder treats ASCII characters differently according to +# whether they are Set D, Set O, Whitespace, or special (i.e. none of +# the above). See RFC2152. This array identifies these different +# sets: +# 0 : "Set D" +# alphanumeric and '(),-./:? +# 1 : "Set O" +# !"#$%&*;<=>@[]^_`{|} +# 2 : "whitespace" +# ht nl cr sp +# 3 : special (must be base64 encoded) +# everything else (i.e. +\~ and non-printing codes 0-8 11-12 14-31 127) + +utf7_category = [ +# nul soh stx etx eot enq ack bel bs ht nl vt np cr so si + 3, 3, 3, 3, 3, 3, 3, 3, 3, 2, 2, 3, 3, 2, 3, 3, +# dle dc1 dc2 dc3 dc4 nak syn etb can em sub esc fs gs rs us + 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, +# sp ! " # $ % & ' ( ) * + , - . / + 2, 1, 1, 1, 1, 1, 1, 0, 0, 0, 1, 3, 0, 0, 0, 0, +# 0 1 2 3 4 5 6 7 8 9 : ; < = > ? + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 0, +# @ A B C D E F G H I J K L M N O + 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, +# P Q R S T U V W X Y Z [ \ ] ^ _ + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 3, 1, 1, 1, +# ` a b c d e f g h i j k l m n o + 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, +# p q r s t u v w x y z { | } ~ del + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 3, 3, +] + +# ENCODE_DIRECT: this character should be encoded as itself. The +# answer depends on whether we are encoding set O as itself, and also +# on whether we are encoding whitespace as itself. RFC2152 makes it +# clear that the answers to these questions vary between +# applications, so this code needs to be flexible. + +def _utf7_ENCODE_DIRECT(oc, directO, directWS): + return(oc < 128 and oc > 0 and + (utf7_category[oc] == 0 or + (directWS and utf7_category[oc] == 2) or + (directO and utf7_category[oc] == 1))) + +def _utf7_ENCODE_CHAR(result, oc, base64bits, base64buffer): + if oc >= 0x10000: + # code first surrogate + base64bits += 16 + base64buffer = (base64buffer << 16) | 0xd800 | ((oc-0x10000) >> 10) + while base64bits >= 6: + result.append(_utf7_TO_BASE64(base64buffer >> (base64bits-6))) + base64bits -= 6 + # prepare second surrogate + oc = 0xDC00 | ((oc-0x10000) & 0x3FF) + base64bits += 16 + base64buffer = (base64buffer << 16) | oc + while base64bits >= 6: + result.append(_utf7_TO_BASE64(base64buffer >> (base64bits-6))) + base64bits -= 6 + return base64bits, base64buffer + +def str_decode_utf_7(s, errors, final=False, + errorhandler=None): + size = len(s) + if size == 0: + return '', 0, 0 + + inShift = False + base64bits = 0 + base64buffer = 0 + surrogate = 0 + outsize = 0 + + result = StringBuilder(size) + pos = 0 + shiftOutStartPos = 0 + startinpos = 0 + while pos < size: + ch = s[pos] + + if inShift: # in a base-64 section + if _utf7_IS_BASE64(ord(ch)): #consume a base-64 character + base64buffer = (base64buffer << 6) | _utf7_FROM_BASE64(ch) + assert base64buffer >= 0 + base64bits += 6 + pos += 1 + + if base64bits >= 16: + # enough bits for a UTF-16 value + outCh = base64buffer >> (base64bits - 16) + assert outCh >= 0 + base64bits -= 16 + base64buffer &= (1 << base64bits) - 1 # clear high bits + assert outCh <= 0xffff + if surrogate: + # expecting a second surrogate + if outCh >= 0xDC00 and outCh <= 0xDFFF: + code = (((surrogate & 0x3FF)<<10) | + (outCh & 0x3FF)) + 0x10000 + rutf8.unichr_as_utf8_append(result, code) + outsize += 1 + surrogate = 0 + continue + else: + rutf8.unichr_as_utf8_append(result, surrogate, + allow_surrogates=True) + outsize += 1 + surrogate = 0 + # Not done with outCh: falls back to next line + if outCh >= 0xD800 and outCh <= 0xDBFF: + # first surrogate + surrogate = outCh + else: + outsize += 1 + assert outCh >= 0 + rutf8.unichr_as_utf8_append(result, outCh, True) + + else: + # now leaving a base-64 section + inShift = False + + if base64bits > 0: # left-over bits + if base64bits >= 6: + # We've seen at least one base-64 character + pos += 1 + msg = "partial character in shift sequence" + res, pos = errorhandler(errors, 'utf7', + msg, s, pos-1, pos) + reslen = rutf8.check_utf8(res, True) + outsize += reslen + result.append(res) + continue + else: + # Some bits remain; they should be zero + if base64buffer != 0: + pos += 1 + msg = "non-zero padding bits in shift sequence" + res, pos = errorhandler(errors, 'utf7', + msg, s, pos-1, pos) + reslen = rutf8.check_utf8(res, True) + outsize += reslen + result.append(res) + continue + + if surrogate and _utf7_DECODE_DIRECT(ord(ch)): + outsize += 1 + rutf8.unichr_as_utf8_append(result, surrogate, True) + surrogate = 0 + + if ch == '-': + # '-' is absorbed; other terminating characters are + # preserved + pos += 1 + + elif ch == '+': + startinpos = pos + pos += 1 # consume '+' + if pos < size and s[pos] == '-': # '+-' encodes '+' + pos += 1 + result.append('+') + outsize += 1 + else: # begin base64-encoded section + inShift = 1 + surrogate = 0 + shiftOutStartPos = result.getlength() + base64bits = 0 + base64buffer = 0 + + elif _utf7_DECODE_DIRECT(ord(ch)): # character decodes at itself + result.append(ch) + outsize += 1 + pos += 1 + else: + startinpos = pos + pos += 1 + msg = "unexpected special character" + res, pos = errorhandler(errors, 'utf7', msg, s, pos-1, pos) + reslen = rutf8.check_utf8(res, True) + outsize += reslen + result.append(res) + + # end of string + final_length = result.getlength() + if inShift and final: # in shift sequence, no more to follow + # if we're in an inconsistent state, that's an error + inShift = 0 + if (surrogate or + base64bits >= 6 or + (base64bits > 0 and base64buffer != 0)): + msg = "unterminated shift sequence" + res, pos = errorhandler(errors, 'utf7', msg, s, shiftOutStartPos, pos) + reslen = rutf8.check_utf8(res, True) + outsize += reslen + result.append(res) + final_length = result.getlength() + elif inShift: + pos = startinpos + final_length = shiftOutStartPos # back off output + + assert final_length >= 0 + return result.build()[:final_length], pos, outsize + +def utf8_encode_utf_7(s, errors, errorhandler): + size = len(s) + if size == 0: + return '' + result = StringBuilder(size) + + encodeSetO = encodeWhiteSpace = False + + inShift = False + base64bits = 0 + base64buffer = 0 + + pos = 0 + while pos < size: + oc = rutf8.codepoint_at_pos(s, pos) + if not inShift: + if oc == ord('+'): + result.append('+-') + elif _utf7_ENCODE_DIRECT(oc, not encodeSetO, not encodeWhiteSpace): + result.append(chr(oc)) + else: + result.append('+') + inShift = True + base64bits, base64buffer = _utf7_ENCODE_CHAR( + result, oc, base64bits, base64buffer) + else: + if _utf7_ENCODE_DIRECT(oc, not encodeSetO, not encodeWhiteSpace): + # shifting out + if base64bits: # output remaining bits + result.append(_utf7_TO_BASE64(base64buffer << (6-base64bits))) + base64buffer = 0 + base64bits = 0 + + inShift = False + ## Characters not in the BASE64 set implicitly unshift the + ## sequence so no '-' is required, except if the character is + ## itself a '-' + if _utf7_IS_BASE64(oc) or oc == ord('-'): + result.append('-') + result.append(chr(oc)) + else: + base64bits, base64buffer = _utf7_ENCODE_CHAR( + result, oc, base64bits, base64buffer) + pos = rutf8.next_codepoint_pos(s, pos) + + if base64bits: + result.append(_utf7_TO_BASE64(base64buffer << (6 - base64bits))) + if inShift: + result.append('-') + + return result.build() # ____________________________________________________________ # utf-16 -def str_decode_utf_16(s, size, errors, final=True, - errorhandler=None): - result, length, byteorder = str_decode_utf_16_helper(s, size, errors, final, - errorhandler, "native", - 'utf-16-' + BYTEORDER2) - return result, length +BYTEORDER = sys.byteorder +BYTEORDER2 = BYTEORDER[0] + 'e' # either "le" or "be" +assert BYTEORDER2 in ('le', 'be') -def str_decode_utf_16_be(s, size, errors, final=True, - errorhandler=None): - result, length, byteorder = str_decode_utf_16_helper(s, size, errors, final, - errorhandler, "big", - 'utf-16-be') - return result, length +def str_decode_utf_16(s, errors, final=True, + errorhandler=None): + result, c, lgt, _ = str_decode_utf_16_helper(s, errors, final, + errorhandler, "native") + return result, c, lgt -def str_decode_utf_16_le(s, size, errors, final=True, - errorhandler=None): - result, length, byteorder = str_decode_utf_16_helper(s, size, errors, final, - errorhandler, "little", - 'utf-16-le') - return result, length +def str_decode_utf_16_be(s, errors, final=True, + errorhandler=None): + result, c, lgt, _ = str_decode_utf_16_helper(s, errors, final, + errorhandler, "big") + return result, c, lgt -def str_decode_utf_16_helper(s, size, errors, final=True, +def str_decode_utf_16_le(s, errors, final=True, + errorhandler=None): + result, c, lgt, _ = str_decode_utf_16_helper(s, errors, final, + errorhandler, "little") + return result, c, lgt + +def str_decode_utf_16_helper(s, errors, final=True, errorhandler=None, byteorder="native", public_encoding_name='utf16'): - if errorhandler is None: - errorhandler = default_unicode_error_decode + size = len(s) bo = 0 if BYTEORDER == 'little': @@ -233,7 +1033,7 @@ else: bo = 1 if size == 0: - return u'', 0, bo + return '', 0, 0, bo if bo == -1: # force little endian ihi = 1 @@ -244,7 +1044,7 @@ ihi = 0 ilo = 1 - result = UnicodeBuilder(size // 2) + result = StringBuilder(size // 2) #XXX I think the errors are not correctly handled here while pos < size: @@ -261,7 +1061,7 @@ ch = (ord(s[pos + ihi]) << 8) | ord(s[pos + ilo]) pos += 2 if ch < 0xD800 or ch > 0xDFFF: - result.append(unichr(ch)) + rutf8.unichr_as_utf8_append(result, ch) continue # UTF-16 code pair: if len(s) - pos < 2: @@ -278,12 +1078,8 @@ ch2 = (ord(s[pos+ihi]) << 8) | ord(s[pos+ilo]) pos += 2 if 0xDC00 <= ch2 <= 0xDFFF: - if MAXUNICODE < 65536: - result.append(unichr(ch)) - result.append(unichr(ch2)) - else: - result.append(UNICHR((((ch & 0x3FF)<<10) | - (ch2 & 0x3FF)) + 0x10000)) + ch = (((ch & 0x3FF)<<10) | (ch2 & 0x3FF)) + 0x10000 + rutf8.unichr_as_utf8_append(result, ch) continue else: r, pos = errorhandler(errors, public_encoding_name, @@ -295,7 +1091,9 @@ "illegal encoding", s, pos - 2, pos) result.append(r) - return result.build(), pos, bo + r = result.build() + lgt = rutf8.check_utf8(r, True) + return result.build(), pos, lgt, bo def _STORECHAR(result, CH, byteorder): hi = chr(((CH) >> 8) & 0xff) @@ -307,13 +1105,12 @@ result.append(hi) result.append(lo) -def unicode_encode_utf_16_helper(s, size, errors, +def unicode_encode_utf_16_helper(s, errors, errorhandler=None, allow_surrogates=True, byteorder='little', public_encoding_name='utf16'): - if errorhandler is None: - errorhandler = default_unicode_error_encode + size = len(s) if size == 0: if byteorder == 'native': result = StringBuilder(2) @@ -327,9 +1124,9 @@ byteorder = BYTEORDER pos = 0 + index = 0 while pos < size: - ch = ord(s[pos]) - pos += 1 + ch = rutf8.codepoint_at_pos(s, pos) if ch < 0xD800: _STORECHAR(result, ch, byteorder) @@ -339,46 +1136,44 @@ elif ch >= 0xE000 or allow_surrogates: _STORECHAR(result, ch, byteorder) else: - ru, rs, pos = errorhandler(errors, public_encoding_name, - 'surrogates not allowed', - s, pos-1, pos) - if rs is not None: - # py3k only - if len(rs) % 2 != 0: - errorhandler('strict', public_encoding_name, - 'surrogates not allowed', - s, pos-1, pos) - result.append(rs) - continue - for ch in ru: - if ord(ch) < 0xD800: - _STORECHAR(result, ord(ch), byteorder) + res_8, newindex = errorhandler( + errors, public_encoding_name, 'surrogates not allowed', + s, pos - 1, pos) + for cp in rutf8.Utf8StringIterator(res_8): + if cp < 0xD800: + _STORECHAR(result, cp, byteorder) else: errorhandler('strict', public_encoding_name, 'surrogates not allowed', s, pos-1, pos) + if index != newindex: # Should be uncommon + index = newindex + pos = rutf8._pos_at_index(s, newindex) continue + pos = rutf8.next_codepoint_pos(s, pos) + index += 1 + return result.build() -def unicode_encode_utf_16(s, size, errors, +def utf8_encode_utf_16(s, errors, errorhandler=None, allow_surrogates=True): - return unicode_encode_utf_16_helper(s, size, errors, errorhandler, + return unicode_encode_utf_16_helper(s, errors, errorhandler, allow_surrogates, "native", 'utf-16-' + BYTEORDER2) -def unicode_encode_utf_16_be(s, size, errors, +def utf8_encode_utf_16_be(s, errors, errorhandler=None, allow_surrogates=True): - return unicode_encode_utf_16_helper(s, size, errors, errorhandler, + return unicode_encode_utf_16_helper(s, errors, errorhandler, allow_surrogates, "big", 'utf-16-be') -def unicode_encode_utf_16_le(s, size, errors, +def utf8_encode_utf_16_le(s, errors, errorhandler=None, allow_surrogates=True): - return unicode_encode_utf_16_helper(s, size, errors, errorhandler, + return unicode_encode_utf_16_helper(s, errors, errorhandler, allow_surrogates, "little", 'utf-16-le') @@ -386,38 +1181,38 @@ # ____________________________________________________________ # utf-32 -def str_decode_utf_32(s, size, errors, final=True, - errorhandler=None): - result, length, byteorder = str_decode_utf_32_helper( +def str_decode_utf_32(s, errors, final=True, + errorhandler=None): + result, c, lgt, _ = str_decode_utf_32_helper(s, errors, final, s, size, errors, final, errorhandler, "native", 'utf-32-' + BYTEORDER2, allow_surrogates=False) - return result, length + return result, c, lgt -def str_decode_utf_32_be(s, size, errors, final=True, - errorhandler=None): - result, length, byteorder = str_decode_utf_32_helper( - s, size, errors, final, errorhandler, "big", 'utf-32-be', +def str_decode_utf_32_be(s, errors, final=True, + errorhandler=None): + result, c, lgt, _ = str_decode_utf_32_helper( + s, errors, final, errorhandler, "big", 'utf-32-be', allow_surrogates=False) - return result, length + return result, c, lgt -def str_decode_utf_32_le(s, size, errors, final=True, - errorhandler=None): - result, length, byteorder = str_decode_utf_32_helper( - s, size, errors, final, errorhandler, "little", 'utf-32-le', +def str_decode_utf_32_le(s, errors, final=True, + errorhandler=None): + result, c, lgt, _ = str_decode_utf_32_helper( + s, errors, final, errorhandler, "little", 'utf-32-le', allow_surrogates=False) - return result, length + return result, c, lgt -BOM32_DIRECT = intmask(0x0000FEFF) +BOM32_DIRECT = intmask(0x0000FEFF) BOM32_REVERSE = intmask(0xFFFE0000) -def str_decode_utf_32_helper(s, size, errors, final=True, - errorhandler=None, +def str_decode_utf_32_helper(s, errors, final, + errorhandler, byteorder="native", public_encoding_name='utf32', allow_surrogates=True): - if errorhandler is None: - errorhandler = default_unicode_error_decode + assert errorhandler is not None bo = 0 + size = len(s) if BYTEORDER == 'little': iorder = [0, 1, 2, 3] @@ -453,7 +1248,7 @@ else: bo = 1 if size == 0: - return u'', 0, bo + return '', 0, 0, bo if bo == -1: # force little endian iorder = [0, 1, 2, 3] @@ -461,7 +1256,7 @@ # force big endian iorder = [3, 2, 1, 0] - result = UnicodeBuilder(size // 4) + result = StringBuilder(size // 4) while pos < size: # remaining bytes at the end? (size should be divisible by 4) @@ -476,7 +1271,7 @@ break continue ch = ((ord(s[pos + iorder[3]]) << 24) | (ord(s[pos + iorder[2]]) << 16) | - (ord(s[pos + iorder[1]]) << 8) | ord(s[pos + iorder[0]])) + (ord(s[pos + iorder[1]]) << 8) | ord(s[pos + iorder[0]])) if not allow_surrogates and 0xD800 <= ch <= 0xDFFF: r, pos = errorhandler(errors, public_encoding_name, "code point in surrogate code point " @@ -487,18 +1282,15 @@ elif ch >= 0x110000: r, pos = errorhandler(errors, public_encoding_name, "codepoint not in range(0x110000)", - s, pos, pos + 4) + s, pos, len(s)) result.append(r) continue - if MAXUNICODE < 65536 and ch >= 0x10000: - ch -= 0x10000L - result.append(unichr(0xD800 + (ch >> 10))) - result.append(unichr(0xDC00 + (ch & 0x03FF))) - else: - result.append(UNICHR(ch)) + rutf8.unichr_as_utf8_append(result, ch, allow_surrogates=allow_surrogates) pos += 4 - return result.build(), pos, bo + r = result.build() + lgt = rutf8.check_utf8(r, True) + return r, pos, lgt, bo def _STORECHAR32(result, CH, byteorder): c0 = chr(((CH) >> 24) & 0xff) @@ -516,13 +1308,12 @@ result.append(c2) result.append(c3) -def unicode_encode_utf_32_helper(s, size, errors, +def unicode_encode_utf_32_helper(s, errors, errorhandler=None, allow_surrogates=True, byteorder='little', public_encoding_name='utf32'): - if errorhandler is None: - errorhandler = default_unicode_error_encode + size = len(s) if size == 0: if byteorder == 'native': result = StringBuilder(4) @@ -536,53 +1327,258 @@ byteorder = BYTEORDER pos = 0 + index = 0 while pos < size: - ch = ord(s[pos]) - pos += 1 - ch2 = 0 + ch = rutf8.codepoint_at_pos(s, pos) + pos = rutf8.next_codepoint_pos(s, pos) if not allow_surrogates and 0xD800 <= ch < 0xE000: - ru, rs, pos = errorhandler( + res_8, newindex = errorhandler( errors, public_encoding_name, 'surrogates not allowed', s, pos - 1, pos) - if rs is not None: - # py3k only - if len(rs) % 4 != 0: + for ch in rutf8.Utf8StringIterator(res_8): + if ch < 0xD800: + _STORECHAR32(result, ch, byteorder) + else: errorhandler( 'strict', public_encoding_name, 'surrogates not allowed', s, pos - 1, pos) - result.append(rs) - continue - for ch in ru: - if ord(ch) < 0xD800: - _STORECHAR32(result, ord(ch), byteorder) - else: - errorhandler( - 'strict', public_encoding_name, - 'surrogates not allowed', s, pos - 1, pos) + if index != newindex: # Should be uncommon + index = newindex + pos = rutf8._pos_at_index(s, newindex) continue - if 0xD800 <= ch < 0xDC00 and MAXUNICODE < 65536 and pos < size: - ch2 = ord(s[pos]) - if 0xDC00 <= ch2 < 0xE000: - ch = (((ch & 0x3FF) << 10) | (ch2 & 0x3FF)) + 0x10000 - pos += 1 _STORECHAR32(result, ch, byteorder) + index += 1 return result.build() -def unicode_encode_utf_32(s, size, errors, - errorhandler=None, allow_surrogates=True): - return unicode_encode_utf_32_helper(s, size, errors, errorhandler, +def utf8_encode_utf_32(s, errors, + errorhandler=None, allow_surrogates=True): + return unicode_encode_utf_32_helper(s, errors, errorhandler, allow_surrogates, "native", 'utf-32-' + BYTEORDER2) -def unicode_encode_utf_32_be(s, size, errors, +def utf8_encode_utf_32_be(s, errors, errorhandler=None, allow_surrogates=True): - return unicode_encode_utf_32_helper(s, size, errors, errorhandler, + return unicode_encode_utf_32_helper(s, errors, errorhandler, allow_surrogates, "big", 'utf-32-be') -def unicode_encode_utf_32_le(s, size, errors, +def utf8_encode_utf_32_le(s, errors, errorhandler=None, allow_surrogates=True): - return unicode_encode_utf_32_helper(s, size, errors, errorhandler, + return unicode_encode_utf_32_helper(s, errors, errorhandler, allow_surrogates, "little", 'utf-32-le') +# ____________________________________________________________ +# unicode-internal + +def str_decode_unicode_internal(s, errors, final=False, + errorhandler=None): + size = len(s) + if size == 0: + return '', 0, 0 + + unicode_bytes = 4 + if BYTEORDER == "little": + start = 0 + stop = unicode_bytes + step = 1 + else: + start = unicode_bytes - 1 + stop = -1 + step = -1 + + result = StringBuilder(size) + pos = 0 + while pos < size: + if pos > size - unicode_bytes: + res, pos = errorhandler(errors, "unicode_internal", + "truncated input", + s, pos, size) + result.append(res) + if pos > size - unicode_bytes: + break + continue + t = r_uint(0) + h = 0 + for j in range(start, stop, step): + t += r_uint(ord(s[pos + j])) << (h*8) + h += 1 + if t > 0x10ffff: + res, pos = errorhandler(errors, "unicode_internal", + "unichr(%d) not in range" % (t,), + s, pos, pos + unicode_bytes) + result.append(res) + continue + rutf8.unichr_as_utf8_append(result, intmask(t), allow_surrogates=True) + pos += unicode_bytes + r = result.build() + lgt = rutf8.check_utf8(r, True) + return r, pos, lgt + +def utf8_encode_unicode_internal(s, errors, errorhandler): + size = len(s) + if size == 0: + return '' + + result = StringBuilder(size * 4) + pos = 0 + while pos < size: + oc = rutf8.codepoint_at_pos(s, pos) + if BYTEORDER == "little": + result.append(chr(oc & 0xFF)) + result.append(chr(oc >> 8 & 0xFF)) + result.append(chr(oc >> 16 & 0xFF)) + result.append(chr(oc >> 24 & 0xFF)) + else: + result.append(chr(oc >> 24 & 0xFF)) + result.append(chr(oc >> 16 & 0xFF)) + result.append(chr(oc >> 8 & 0xFF)) + result.append(chr(oc & 0xFF)) + pos = rutf8.next_codepoint_pos(s, pos) + + return result.build() + +# ____________________________________________________________ +# Charmap + +ERROR_CHAR = u'\ufffe'.encode('utf8') + [email protected](4) +def str_decode_charmap(s, errors, final=False, + errorhandler=None, mapping=None): + "mapping can be a rpython dictionary, or a dict-like object." + + # Default to Latin-1 + if mapping is None: + return str_decode_latin_1(s, errors, final=final, + errorhandler=errorhandler) + size = len(s) + if size == 0: + return '', 0, 0 + + pos = 0 + result = StringBuilder(size) + while pos < size: + ch = s[pos] + + c = mapping.get(ord(ch), ERROR_CHAR) + if c == ERROR_CHAR: + r, pos = errorhandler(errors, "charmap", + "character maps to <undefined>", + s, pos, pos + 1) + result.append(r) + continue + result.append(c) + pos += 1 + r = result.build() + lgt = rutf8.check_utf8(r, True) + return r, pos, lgt + +def utf8_encode_charmap(s, errors, errorhandler=None, mapping=None): + size = len(s) + if mapping is None: + return utf8_encode_latin_1(s, errors, errorhandler=errorhandler) + + if size == 0: + return '' + result = StringBuilder(size) + pos = 0 + index = 0 + while pos < size: + ch = rutf8.codepoint_at_pos(s, pos) + c = mapping.get(ch, '') + if len(c) == 0: + # collect all unencodable chars. + startindex = index + pos = rutf8.next_codepoint_pos(s, pos) + index += 1 + while (pos < size and + mapping.get(rutf8.codepoint_at_pos(s, pos), '') == ''): + pos = rutf8.next_codepoint_pos(s, pos) + index += 1 + res_8, newindex = errorhandler(errors, "charmap", + "character maps to <undefined>", + s, startindex, index) + for cp2 in rutf8.Utf8StringIterator(res_8): + ch2 = mapping.get(cp2, '') + if not ch2: + errorhandler( + "strict", "charmap", "character maps to <undefined>", + s, startindex, index) + result.append(ch2) + if index != newindex: # Should be uncommon + index = newindex + pos = rutf8._pos_at_index(s, newindex) + continue + result.append(c) + index += 1 + pos = rutf8.next_codepoint_pos(s, pos) + return result.build() + +# ____________________________________________________________ +# Decimal Encoder +def unicode_encode_decimal(s, errors, errorhandler=None): + """Converts whitespace to ' ', decimal characters to their + corresponding ASCII digit and all other Latin-1 characters except + \0 as-is. Characters outside this range (Unicode ordinals 1-256) + are treated as errors. This includes embedded NULL bytes. + """ + if errorhandler is None: + errorhandler = default_error_encode + result = StringBuilder(len(s)) _______________________________________________ pypy-commit mailing list [email protected] https://mail.python.org/mailman/listinfo/pypy-commit
