Author: Armin Rigo <ar...@tunes.org> Branch: unicode-utf8 Changeset: r92248:bf0d9ddd4a6e Date: 2017-08-24 14:39 +0200 http://bitbucket.org/pypy/pypy/changeset/bf0d9ddd4a6e/
Log: (arigo, fijal climbing) Clean up rutf8.py diff --git a/rpython/rlib/rutf8.py b/rpython/rlib/rutf8.py --- a/rpython/rlib/rutf8.py +++ b/rpython/rlib/rutf8.py @@ -1,60 +1,70 @@ +""" This file is about supporting unicode strings in RPython, +represented by a byte string that is exactly the UTF-8 version +(for some definition of UTF-8). +This doesn't support Python 2's unicode characters beyond 0x10ffff, +which are theoretically possible to obtain using strange tricks like +the array or ctypes modules. + +Fun comes from surrogates. Various functions don't normally accept +any unicode character betwen 0xd800 and 0xdfff, but do if you give +the 'allow_surrogates = True' flag. +""" + +from rpython.rlib.objectmodel import enforceargs from rpython.rlib.rstring import StringBuilder -from rpython.rlib import runicode, jit +from rpython.rlib import jit +from rpython.rlib.rarithmetic import r_uint -def unichr_as_utf8(code): - """ Encode code (numeric value) as utf8 encoded string + +def unichr_as_utf8(code, allow_surrogates=False): + """Encode code (numeric value) as utf8 encoded string """ - if code < 0: - raise ValueError - lgt = 1 - if code >= runicode.MAXUNICODE: - lgt = 2 - if code < 0x80: + code = r_uint(code) + if code <= r_uint(0x7F): # Encode ASCII - return chr(code), 1 - if code < 0x0800: - # Encode Latin-1 - return chr((0xc0 | (code >> 6))) + chr((0x80 | (code & 0x3f))), lgt - if code < 0x10000: + return chr(code) + if code <= r_uint(0x07FF): + return chr((0xc0 | (code >> 6))) + chr((0x80 | (code & 0x3f))) + if code <= r_uint(0xFFFF): + if not allow_surrogates and 0xD800 <= code <= 0xDfff: + raise ValueError return (chr((0xe0 | (code >> 12))) + chr((0x80 | ((code >> 6) & 0x3f))) + - chr((0x80 | (code & 0x3f)))), lgt - if code < 0x10ffff: + chr((0x80 | (code & 0x3f)))) + if code <= r_uint(0x10FFFF): return (chr((0xf0 | (code >> 18))) + chr((0x80 | ((code >> 12) & 0x3f))) + chr((0x80 | ((code >> 6) & 0x3f))) + - chr((0x80 | (code & 0x3f)))), lgt + chr((0x80 | (code & 0x3f)))) raise ValueError -def unichr_as_utf8_append(builder, code): - """ Encode code (numeric value) as utf8 encoded string +def unichr_as_utf8_append(builder, code, allow_surrogates=False): + """Encode code (numeric value) as utf8 encoded string + and emit the result into the given StringBuilder. """ - if code < 0: - raise ValueError - lgt = 1 - if code >= runicode.MAXUNICODE: - lgt = 2 - if code < 0x80: + code = r_uint(code) + if code <= r_uint(0x7F): # Encode ASCII builder.append(chr(code)) - return 1 - if code < 0x0800: - # Encode Latin-1 + return + if code <= r_uint(0x07FF): builder.append(chr((0xc0 | (code >> 6)))) builder.append(chr((0x80 | (code & 0x3f)))) - return lgt - if code < 0x10000: + return + if code <= r_uint(0xFFFF): + if not allow_surrogates and 0xd800 <= code <= 0xdfff: + raise ValueError builder.append(chr((0xe0 | (code >> 12)))) builder.append(chr((0x80 | ((code >> 6) & 0x3f)))) builder.append(chr((0x80 | (code & 0x3f)))) - return lgt - if code < 0x10ffff: + return + if code <= r_uint(0x10FFFF): builder.append(chr((0xf0 | (code >> 18)))) builder.append(chr((0x80 | ((code >> 12) & 0x3f)))) builder.append(chr((0x80 | ((code >> 6) & 0x3f)))) builder.append(chr((0x80 | (code & 0x3f)))) - return lgt + return raise ValueError # note - table lookups are really slow. Measured on various elements of obama @@ -62,61 +72,64 @@ # In extreme cases (small, only chinese text), they're 40% slower def next_codepoint_pos(code, pos): - """ Gives the position of the next codepoint after pos, -1 - if it's the last one (assumes valid utf8) + """Gives the position of the next codepoint after pos. + Assumes valid utf8. 'pos' must be before the end of the string. """ chr1 = ord(code[pos]) - if chr1 < 0x80: + if chr1 <= 0x7F: return pos + 1 - if 0xC2 <= chr1 <= 0xDF: + if chr1 <= 0xDF: return pos + 2 - if chr1 >= 0xE0 and chr1 <= 0xEF: + if chr1 <= 0xEF: return pos + 3 return pos + 4 def prev_codepoint_pos(code, pos): - """ Gives the position of the previous codepoint + """Gives the position of the previous codepoint. + 'pos' must not be zero. """ pos -= 1 chr1 = ord(code[pos]) - if chr1 < 0x80: + if chr1 <= 0x7F: return pos - while ord(code[pos]) & 0xC0 == 0x80: - pos -= 1 + pos -= 1 + if ord(code[pos]) >= 0xC0: + return pos + pos -= 1 + if ord(code[pos]) >= 0xC0: + return pos + pos -= 1 return pos def compute_length_utf8(s): - pos = 0 - lgt = 0 - while pos < len(s): - pos = next_codepoint_pos(s, pos) - lgt += 1 - return lgt + continuation_bytes = 0 + for i in range(len(s)): + if 0x80 <= ord(s[i]) <= 0xBF: # count the continuation bytes + continuation_bytes += 1 + return len(s) - continuation_bytes def codepoint_at_pos(code, pos): """ Give a codepoint in code at pos - assumes valid utf8, no checking! """ ordch1 = ord(code[pos]) - if ordch1 < 0x80: + if ordch1 <= 0x7F: return ordch1 - n = ord(runicode._utf8_code_length[ordch1 - 0x80]) - if n == 2: - ordch2 = ord(code[pos+1]) + ordch2 = ord(code[pos+1]) + if ordch1 <= 0xDF: # 110yyyyy 10zzzzzz -> 00000000 00000yyy yyzzzzzz return (((ordch1 & 0x1F) << 6) + # 0b00011111 (ordch2 & 0x3F)) # 0b00111111 - elif n == 3: - ordch2 = ord(code[pos+1]) - ordch3 = ord(code[pos+2]) + + ordch3 = ord(code[pos+2]) + if ordch1 <= 0xEF: # 1110xxxx 10yyyyyy 10zzzzzz -> 00000000 xxxxyyyy yyzzzzzz return (((ordch1 & 0x0F) << 12) + # 0b00001111 ((ordch2 & 0x3F) << 6) + # 0b00111111 (ordch3 & 0x3F)) # 0b00111111 - elif n == 4: - ordch2 = ord(code[pos+1]) - ordch3 = ord(code[pos+2]) - ordch4 = ord(code[pos+3]) + + ordch4 = ord(code[pos+3]) + if True: # 11110www 10xxxxxx 10yyyyyy 10zzzzzz -> 000wwwxx xxxxyyyy yyzzzzzz return (((ordch1 & 0x07) << 18) + # 0b00000111 ((ordch2 & 0x3F) << 12) + # 0b00111111 @@ -124,46 +137,44 @@ (ordch4 & 0x3F)) # 0b00111111 assert False, "unreachable" -class AsciiCheckError(Exception): - def __init__(self, pos): - self.pos = pos +class CheckError(Exception): + pass -def check_ascii(s, size=-1): - if size == -1: - size = len(s) - for i in range(0, size): - if ord(s[i]) & 0x80: - raise AsciiCheckError(i) +@jit.elidable +def check_ascii(s): + for i in range(len(s)): + if ord(s[i]) > 0x7F: + raise CheckError -def utf8_encode_ascii(s, errors, encoding, msg, errorhandler): - res = StringBuilder(len(s)) - u_pos = 0 - pos = 0 - while pos < len(s): - chr1 = s[pos] - if ord(chr1) < 0x80: - res.append(chr1) - else: - repl, _, _, _ = errorhandler(errors, encoding, msg, s, u_pos, u_pos + 1) - res.append(repl) - u_pos += 1 - pos = next_codepoint_pos(s, pos) - return res.build() +#def utf8_encode_ascii(s, errors, encoding, msg, errorhandler): +# res = StringBuilder(len(s)) +# u_pos = 0 +# pos = 0 +# while pos < len(s): +# chr1 = s[pos] +# if ord(chr1) < 0x80: +# res.append(chr1) +# else: +# repl, _, _, _ = errorhandler(errors, encoding, msg, s, u_pos, u_pos + 1) +# res.append(repl) +# u_pos += 1 +# pos = next_codepoint_pos(s, pos) +# return res.build() -def str_decode_ascii(s, size, errors, errorhandler): - # ASCII is equivalent to the first 128 ordinals in Unicode. - result = StringBuilder(size) - pos = 0 - while pos < size: - c = s[pos] - if ord(c) < 128: - result.append(c) - else: - r, _, _ = errorhandler(errors, "ascii", "ordinal not in range(128)", - s, pos, pos + 1) - result.append(r) - pos += 1 - return result.build(), pos, -1 +#def str_decode_ascii(s, size, errors, errorhandler): +# # ASCII is equivalent to the first 128 ordinals in Unicode. +# result = StringBuilder(size) +# pos = 0 +# while pos < size: +# c = s[pos] +# if ord(c) < 128: +# result.append(c) +# else: +# r, _, _ = errorhandler(errors, "ascii", "ordinal not in range(128)", +# s, pos, pos + 1) +# result.append(r) +# pos += 1 +# return result.build(), pos, -1 def islinebreak(s, pos): chr1 = ord(s[pos]) @@ -217,149 +228,92 @@ return True return False -def utf8_in_chars(value, pos, chars): - """ equivalent of u'x' in u'xyz', just done in utf8 - """ - lgt = next_codepoint_pos(value, pos) - pos - i = 0 - while i < len(chars): - j = next_codepoint_pos(chars, i) - if j - i != lgt: - i = j - continue - for k in range(lgt): - if value[k + pos] != chars[i + k]: - break - else: - return True - i = j - return False -class Utf8CheckError(Exception): - def __init__(self, msg, startpos, endpos): - self.msg = msg - self.startpos = startpos - self.endpos = endpos +def _invalid_cont_byte(ordch): + return ordch>>6 != 0x2 # 0b10 + +_invalid_byte_2_of_2 = _invalid_cont_byte +_invalid_byte_3_of_3 = _invalid_cont_byte +_invalid_byte_3_of_4 = _invalid_cont_byte +_invalid_byte_4_of_4 = _invalid_cont_byte + +@enforceargs(allow_surrogates=bool) +def _invalid_byte_2_of_3(ordch1, ordch2, allow_surrogates): + return (ordch2>>6 != 0x2 or # 0b10 + (ordch1 == 0xe0 and ordch2 < 0xa0) + # surrogates shouldn't be valid UTF-8! + or (ordch1 == 0xed and ordch2 > 0x9f and not allow_surrogates)) + +def _invalid_byte_2_of_4(ordch1, ordch2): + return (ordch2>>6 != 0x2 or # 0b10 + (ordch1 == 0xf0 and ordch2 < 0x90) or + (ordch1 == 0xf4 and ordch2 > 0x8f)) + @jit.elidable -def str_check_utf8(s, size, final=False, - allow_surrogates=runicode.allow_surrogate_by_default): - """ A simplified version of utf8 encoder - it only works with 'strict' - error handling. +def check_utf8(s, allow_surrogates=False): + """Check that 's' is a utf-8-encoded byte string. + Returns the length (number of chars) or raise CheckError. + Note that surrogates are not handled specially here. """ - # XXX do the following in a cleaner way, e.g. via signature - # NB. a bit messy because rtyper/rstr.py also calls the same - # function. Make sure we annotate for the args it passes, too - #if NonConstant(False): - # s = NonConstant('?????') - # size = NonConstant(12345) - # errors = NonConstant('strict') - # final = NonConstant(True) - # errorhandler = ll_unicode_error_decode - # allow_surrogates = NonConstant(True) - if size == 0: - return 0, 0 - pos = 0 - lgt = 0 - while pos < size: + continuation_bytes = 0 + while pos < len(s): ordch1 = ord(s[pos]) + pos += 1 # fast path for ASCII - # XXX maybe use a while loop here - if ordch1 < 0x80: - lgt += 1 - pos += 1 + if ordch1 <= 0x7F: continue - n = ord(runicode._utf8_code_length[ordch1 - 0x80]) - if pos + n > size: - if not final: - break - # argh, this obscure block of code is mostly a copy of - # what follows :-( - charsleft = size - pos - 1 # either 0, 1, 2 - # note: when we get the 'unexpected end of data' we need - # to care about the pos returned; it can be lower than size, - # in case we need to continue running this loop - if not charsleft: - # there's only the start byte and nothing else - raise Utf8CheckError('unexpected end of data', pos, pos + 1) - ordch2 = ord(s[pos+1]) - if n == 3: - # 3-bytes seq with only a continuation byte - if runicode._invalid_byte_2_of_3(ordch1, ordch2, allow_surrogates): - # second byte invalid, take the first and continue - raise Utf8CheckError('invalid continuation byte', pos, - pos + 1) - else: - # second byte valid, but third byte missing - raise Utf8CheckError('unexpected end of data', pos, pos + 2) - elif n == 4: - # 4-bytes seq with 1 or 2 continuation bytes - if runicode._invalid_byte_2_of_4(ordch1, ordch2): - # second byte invalid, take the first and continue - raise Utf8CheckError('invalid continuation byte', pos, - pos + 1) - elif charsleft == 2 and runicode._invalid_byte_3_of_4(ord(s[pos+2])): - # third byte invalid, take the first two and continue - raise Utf8CheckError('invalid continuation byte', pos, - pos + 2) - else: - # there's only 1 or 2 valid cb, but the others are missing - raise Utf8CheckError('unexpected end of data', pos, - pos + charsleft + 1) - raise AssertionError("unreachable") + if ordch1 <= 0xC1: + raise CheckError - if n == 0: - raise Utf8CheckError('invalid start byte', pos, pos + 1) - elif n == 1: - assert 0, "ascii should have gone through the fast path" + if ordch1 <= 0xDF: + continuation_bytes += 1 + if pos >= len(s): + raise CheckError + ordch2 = ord(s[pos]) + pos += 1 - elif n == 2: - ordch2 = ord(s[pos+1]) - if runicode._invalid_byte_2_of_2(ordch2): - raise Utf8CheckError('invalid continuation byte', pos, - pos + 2) + if _invalid_byte_2_of_2(ordch2): + raise CheckError # 110yyyyy 10zzzzzz -> 00000000 00000yyy yyzzzzzz - lgt += 1 + continue + + if ordch1 <= 0xEF: + continuation_bytes += 2 + if (pos + 2) > len(s): + raise CheckError + ordch2 = ord(s[pos]) + ordch3 = ord(s[pos + 1]) pos += 2 - elif n == 3: - ordch2 = ord(s[pos+1]) - ordch3 = ord(s[pos+2]) - if runicode._invalid_byte_2_of_3(ordch1, ordch2, allow_surrogates): - raise Utf8CheckError('invalid continuation byte', pos, - pos + 1) - elif runicode._invalid_byte_3_of_3(ordch3): - raise Utf8CheckError('invalid continuation byte', pos, - pos + 2) + if _invalid_byte_2_of_3(ordch1, ordch2, allow_surrogates): + raise CheckError + elif _invalid_byte_3_of_3(ordch3): + raise CheckError # 1110xxxx 10yyyyyy 10zzzzzz -> 00000000 xxxxyyyy yyzzzzzz - lgt += 1 + continue + + if ordch1 <= 0xF4: + continuation_bytes += 3 + if (pos + 3) > len(s): + raise CheckError + ordch2 = ord(s[pos]) + ordch3 = ord(s[pos + 1]) + ordch4 = ord(s[pos + 2]) pos += 3 - elif n == 4: - ordch2 = ord(s[pos+1]) - ordch3 = ord(s[pos+2]) - ordch4 = ord(s[pos+3]) - if runicode._invalid_byte_2_of_4(ordch1, ordch2): - raise Utf8CheckError('invalid continuation byte', pos, - pos + 1) - elif runicode._invalid_byte_3_of_4(ordch3): - raise Utf8CheckError('invalid continuation byte', pos, - pos + 2) - elif runicode._invalid_byte_4_of_4(ordch4): - raise Utf8CheckError('invalid continuation byte', pos, - pos + 3) + if _invalid_byte_2_of_4(ordch1, ordch2): + raise CheckError + elif _invalid_byte_3_of_4(ordch3): + raise CheckError + elif _invalid_byte_4_of_4(ordch4): + raise CheckError # 11110www 10xxxxxx 10yyyyyy 10zzzzzz -> 000wwwxx xxxxyyyy yyzzzzzz - c = (((ordch1 & 0x07) << 18) + # 0b00000111 - ((ordch2 & 0x3F) << 12) + # 0b00111111 - ((ordch3 & 0x3F) << 6) + # 0b00111111 - (ordch4 & 0x3F)) # 0b00111111 - if c <= runicode.MAXUNICODE: - lgt += 1 - else: - # append the two surrogates: - lgt += 2 - pos += 4 + continue - return pos, lgt + raise CheckError + + assert pos == len(s) + return pos - continuation_bytes diff --git a/rpython/rlib/test/test_rutf8.py b/rpython/rlib/test/test_rutf8.py --- a/rpython/rlib/test/test_rutf8.py +++ b/rpython/rlib/test/test_rutf8.py @@ -1,14 +1,18 @@ - +import py import sys from hypothesis import given, strategies, settings, example from rpython.rlib import rutf8, runicode -@given(strategies.integers(min_value=0, max_value=runicode.MAXUNICODE)) -def test_unichr_as_utf8(i): - u, lgt = rutf8.unichr_as_utf8(i) - r = runicode.UNICHR(i) - assert u == r.encode('utf8') + +@given(strategies.characters(), strategies.booleans()) +def test_unichr_as_utf8(c, allow_surrogates): + i = ord(c) + if not allow_surrogates and 0xD800 <= i <= 0xDFFF: + py.test.raises(ValueError, rutf8.unichr_as_utf8, i, allow_surrogates) + else: + u = rutf8.unichr_as_utf8(i, allow_surrogates) + assert u == c.encode('utf8') @given(strategies.binary()) def test_check_ascii(s): @@ -19,28 +23,32 @@ raised = True try: rutf8.check_ascii(s) - except rutf8.AsciiCheckError as a: + except rutf8.CheckError: assert raised - assert a.pos == e.start else: assert not raised -@given(strategies.binary()) -def test_str_check_utf8(s): +@given(strategies.binary(), strategies.booleans()) +def test_check_utf8(s, allow_surrogates): + _test_check_utf8(s, allow_surrogates) + +@given(strategies.text(), strategies.booleans()) +def test_check_utf8_valid(u, allow_surrogates): + _test_check_utf8(u.encode('utf-8'), allow_surrogates) + +def _test_check_utf8(s, allow_surrogates): try: - u, _ = runicode.str_decode_utf_8(s, len(s), None, final=True) + u, _ = runicode.str_decode_utf_8(s, len(s), None, final=True, + allow_surrogates=allow_surrogates) valid = True except UnicodeDecodeError as e: valid = False try: - consumed, length = rutf8.str_check_utf8(s, len(s), final=True) - except rutf8.Utf8CheckError as a: + length = rutf8.check_utf8(s, allow_surrogates) + except rutf8.CheckError: assert not valid - assert a.startpos == e.start - # assert a.end == e.end, ideally else: assert valid - assert consumed == len(s) assert length == len(u) @given(strategies.characters()) @@ -80,5 +88,5 @@ response = True else: response = False - r = rutf8.utf8_in_chars(unichr(i).encode('utf8'), 0, uni.encode('utf8')) + r = unichr(i).encode('utf8') in uni.encode('utf8') assert r == response _______________________________________________ pypy-commit mailing list pypy-commit@python.org https://mail.python.org/mailman/listinfo/pypy-commit