Author: Amaury Forgeot d'Arc <amaur...@gmail.com>
Branch: 
Changeset: r58847:830a3025e27b
Date: 2012-11-13 00:12 +0100
http://bitbucket.org/pypy/pypy/changeset/830a3025e27b/

Log:    Add RPython support for 'replace' and 'ignore' error handlers.

diff --git a/pypy/rlib/runicode.py b/pypy/rlib/runicode.py
--- a/pypy/rlib/runicode.py
+++ b/pypy/rlib/runicode.py
@@ -46,12 +46,20 @@
     ORD = ord
 
 
-def raise_unicode_exception_decode(errors, encoding, msg, s,
-                                   startingpos, endingpos):
+def default_unicode_error_decode(errors, encoding, msg, s,
+                                 startingpos, endingpos):
+    if errors == 'replace':
+        return u'\ufffd', endingpos
+    if errors == 'ignore':
+        return u'', endingpos
     raise UnicodeDecodeError(encoding, s, startingpos, endingpos, msg)
 
-def raise_unicode_exception_encode(errors, encoding, msg, u,
-                                   startingpos, endingpos):
+def default_unicode_error_encode(errors, encoding, msg, u,
+                                 startingpos, endingpos):
+    if errors == 'replace':
+        return u'?', endingpos
+    if errors == 'ignore':
+        return u'', endingpos
     raise UnicodeEncodeError(encoding, u, startingpos, endingpos, msg)
 
 # ____________________________________________________________
@@ -79,7 +87,7 @@
 def str_decode_utf_8(s, size, errors, final=False,
                      errorhandler=None, allow_surrogates=False):
     if errorhandler is None:
-        errorhandler = raise_unicode_exception_decode
+        errorhandler = default_unicode_error_decode
     return str_decode_utf_8_impl(s, size, errors, final, errorhandler,
                                  allow_surrogates=allow_surrogates)
 
@@ -258,7 +266,7 @@
 def unicode_encode_utf_8(s, size, errors, errorhandler=None,
                          allow_surrogates=False):
     if errorhandler is None:
-        errorhandler = raise_unicode_exception_encode
+        errorhandler = default_unicode_error_encode
     return unicode_encode_utf_8_impl(s, size, errors, errorhandler,
                                      allow_surrogates=allow_surrogates)
 
@@ -336,7 +344,7 @@
                              errorhandler=None,
                              byteorder="native"):
     if errorhandler is None:
-        errorhandler = raise_unicode_exception_decode
+        errorhandler = default_unicode_error_decode
     bo = 0
 
     if BYTEORDER == 'little':
@@ -513,7 +521,7 @@
                              errorhandler=None,
                              byteorder="native"):
     if errorhandler is None:
-        errorhandler = raise_unicode_exception_decode
+        errorhandler = default_unicode_error_decode
     bo = 0
 
     if BYTEORDER == 'little':
@@ -737,7 +745,7 @@
 def str_decode_utf_7(s, size, errors, final=False,
                      errorhandler=None):
     if errorhandler is None:
-        errorhandler = raise_unicode_exception_decode
+        errorhandler = default_unicode_error_decode
     if size == 0:
         return u'', 0
 
@@ -925,7 +933,7 @@
 def str_decode_ascii(s, size, errors, final=False,
                      errorhandler=None):
     if errorhandler is None:
-        errorhandler = raise_unicode_exception_decode
+        errorhandler = default_unicode_error_decode
     # ASCII is equivalent to the first 128 ordinals in Unicode.
     result = UnicodeBuilder(size)
     pos = 0
@@ -944,7 +952,7 @@
 def unicode_encode_ucs1_helper(p, size, errors,
                                errorhandler=None, limit=256):
     if errorhandler is None:
-        errorhandler = raise_unicode_exception_encode
+        errorhandler = default_unicode_error_encode
     if limit == 256:
         reason = "ordinal not in range(256)"
         encoding = "latin-1"
@@ -1002,7 +1010,7 @@
         return str_decode_latin_1(s, size, errors, final=final,
                                   errorhandler=errorhandler)
     if errorhandler is None:
-        errorhandler = raise_unicode_exception_decode
+        errorhandler = default_unicode_error_decode
     if size == 0:
         return u'', 0
 
@@ -1029,7 +1037,7 @@
                                       errorhandler=errorhandler)
 
     if errorhandler is None:
-        errorhandler = raise_unicode_exception_encode
+        errorhandler = default_unicode_error_encode
 
     if size == 0:
         return ''
@@ -1102,7 +1110,7 @@
                               errorhandler=False,
                               unicodedata_handler=None):
     if errorhandler is None:
-        errorhandler = raise_unicode_exception_decode
+        errorhandler = default_unicode_error_decode
 
     if size == 0:
         return u'', 0
@@ -1344,7 +1352,7 @@
 def str_decode_raw_unicode_escape(s, size, errors, final=False,
                                   errorhandler=None):
     if errorhandler is None:
-        errorhandler = raise_unicode_exception_decode
+        errorhandler = default_unicode_error_decode
     if size == 0:
         return u'', 0
 
@@ -1429,7 +1437,7 @@
 def str_decode_unicode_internal(s, size, errors, final=False,
                                 errorhandler=None):
     if errorhandler is None:
-        errorhandler = raise_unicode_exception_decode
+        errorhandler = default_unicode_error_decode
     if size == 0:
         return u'', 0
 
@@ -1540,7 +1548,7 @@
             return u"", 0
 
         if errorhandler is None:
-            errorhandler = raise_unicode_exception_decode
+            errorhandler = default_unicode_error_decode
 
         # Skip trailing lead-byte unless 'final' is set
         if not final and is_dbcs_lead_byte(s[size-1]):
@@ -1604,7 +1612,7 @@
     are treated as errors. This includes embedded NULL bytes.
     """
     if errorhandler is None:
-        errorhandler = raise_unicode_exception_encode
+        errorhandler = default_unicode_error_encode
     if size == 0:
         return ''
     result = StringBuilder(size)
diff --git a/pypy/rlib/test/test_runicode.py b/pypy/rlib/test/test_runicode.py
--- a/pypy/rlib/test/test_runicode.py
+++ b/pypy/rlib/test/test_runicode.py
@@ -146,6 +146,10 @@
     def test_ascii_error(self):
         self.checkdecodeerror("abc\xFF\xFF\xFFcde", "ascii", 3, 4)
 
+    def test_decode_replace(self):
+        decoder = self.getdecoder('utf-8')
+        assert decoder('caf\xe9', 4, 'replace', True) == (u'caf\ufffd', 4)
+
     def test_utf16_errors(self):
         # trunkated BOM
         for s in ["\xff", "\xfe"]:
@@ -231,12 +235,6 @@
     def __init__(self):
         self.decoder = self.getdecoder('utf-8')
 
-    def replace_handler(self, errors, codec, message, input, start, end):
-        return u'\ufffd', end
-
-    def ignore_handler(self, errors, codec, message, input, start, end):
-        return u'', end
-
     def to_bytestring(self, bytes):
         return ''.join(chr(int(c, 16)) for c in bytes.split())
 
@@ -261,16 +259,13 @@
             raises(UnicodeDecodeError, self.decoder, byte, 1, None, final=True)
             self.checkdecodeerror(byte, 'utf-8', 0, 1, addstuff=False,
                                   msg='invalid start byte')
-            assert self.decoder(byte, 1, None, final=True,
-                       errorhandler=self.replace_handler) == (FFFD, 1)
-            assert (self.decoder('aaaa' + byte + 'bbbb', 9, None,
-                        final=True, errorhandler=self.replace_handler) ==
+            assert self.decoder(byte, 1, 'replace', final=True) == (FFFD, 1)
+            assert (self.decoder('aaaa' + byte + 'bbbb', 9, 'replace',
+                        final=True) ==
                         (u'aaaa'+ FFFD + u'bbbb', 9))
-            assert self.decoder(byte, 1, None, final=True,
-                           errorhandler=self.ignore_handler) == (u'', 1)
-            assert (self.decoder('aaaa' + byte + 'bbbb', 9, None,
-                        final=True, errorhandler=self.ignore_handler) ==
-                        (u'aaaabbbb', 9))
+            assert self.decoder(byte, 1, 'ignore', final=True) == (u'', 1)
+            assert (self.decoder('aaaa' + byte + 'bbbb', 9, 'ignore',
+                        final=True) == (u'aaaabbbb', 9))
 
     def test_unexpected_end_of_data(self):
         """
@@ -300,16 +295,15 @@
                    None, final=True)
             self.checkdecodeerror(seq, 'utf-8', 0, len(seq), addstuff=False,
                                   msg='unexpected end of data')
-            assert self.decoder(seq, len(seq), None, final=True,
-                       errorhandler=self.replace_handler) == (FFFD, len(seq))
-            assert (self.decoder('aaaa' + seq + 'bbbb', len(seq) + 8, None,
-                        final=True, errorhandler=self.replace_handler) ==
+            assert self.decoder(seq, len(seq), 'replace', final=True
+                                ) == (FFFD, len(seq))
+            assert (self.decoder('aaaa' + seq + 'bbbb', len(seq) + 8,
+                                 'replace', final=True) ==
                         (u'aaaa'+ FFFD + u'bbbb', len(seq) + 8))
-            assert self.decoder(seq, len(seq), None, final=True,
-                           errorhandler=self.ignore_handler) == (u'', len(seq))
-            assert (self.decoder('aaaa' + seq + 'bbbb', len(seq) + 8, None,
-                        final=True, errorhandler=self.ignore_handler) ==
-                        (u'aaaabbbb', len(seq) + 8))
+            assert self.decoder(seq, len(seq), 'ignore', final=True
+                                ) == (u'', len(seq))
+            assert (self.decoder('aaaa' + seq + 'bbbb', len(seq) + 8, 'ignore',
+                        final=True) == (u'aaaabbbb', len(seq) + 8))
 
     def test_invalid_cb_for_2bytes_seq(self):
         """
@@ -335,16 +329,16 @@
                    None, final=True)
             self.checkdecodeerror(seq, 'utf-8', 0, 1, addstuff=False,
                                   msg='invalid continuation byte')
-            assert self.decoder(seq, len(seq), None, final=True,
-                       errorhandler=self.replace_handler) == (res, len(seq))
-            assert (self.decoder('aaaa' + seq + 'bbbb', len(seq) + 8, None,
-                        final=True, errorhandler=self.replace_handler) ==
+            assert self.decoder(seq, len(seq), 'replace', final=True
+                                ) == (res, len(seq))
+            assert (self.decoder('aaaa' + seq + 'bbbb', len(seq) + 8,
+                                 'replace', final=True) ==
                         (u'aaaa' + res + u'bbbb', len(seq) + 8))
             res = res.replace(FFFD, u'')
-            assert self.decoder(seq, len(seq), None, final=True,
-                           errorhandler=self.ignore_handler) == (res, len(seq))
-            assert (self.decoder('aaaa' + seq + 'bbbb', len(seq) + 8, None,
-                        final=True, errorhandler=self.ignore_handler) ==
+            assert self.decoder(seq, len(seq), 'ignore', final=True
+                                ) == (res, len(seq))
+            assert (self.decoder('aaaa' + seq + 'bbbb', len(seq) + 8,
+                                 'ignore', final=True) ==
                         (u'aaaa' + res + u'bbbb', len(seq) + 8))
 
     def test_invalid_cb_for_3bytes_seq(self):
@@ -407,17 +401,16 @@
                    None, final=True)
             self.checkdecodeerror(seq, 'utf-8', 0, len(seq)-1, addstuff=False,
                                   msg='invalid continuation byte')
-            assert self.decoder(seq, len(seq), None, final=True,
-                       errorhandler=self.replace_handler) == (res, len(seq))
-            assert (self.decoder('aaaa' + seq + 'bbbb', len(seq) + 8, None,
-                        final=True, errorhandler=self.replace_handler) ==
+            assert self.decoder(seq, len(seq), 'replace', final=True
+                                ) == (res, len(seq))
+            assert (self.decoder('aaaa' + seq + 'bbbb', len(seq) + 8,
+                                 'replace', final=True) ==
                         (u'aaaa' + res + u'bbbb', len(seq) + 8))
             res = res.replace(FFFD, u'')
-            assert self.decoder(seq, len(seq), None, final=True,
-                           errorhandler=self.ignore_handler) == (res, len(seq))
-            assert (self.decoder('aaaa' + seq + 'bbbb', len(seq) + 8, None,
-                        final=True, errorhandler=self.ignore_handler) ==
-                        (u'aaaa' + res + u'bbbb', len(seq) + 8))
+            assert self.decoder(seq, len(seq), 'ignore', final=True
+                                ) == (res, len(seq))
+            assert (self.decoder('aaaa' + seq + 'bbbb', len(seq) + 8, 'ignore',
+                        final=True) == (u'aaaa' + res + u'bbbb', len(seq) + 8))
 
     def test_invalid_cb_for_4bytes_seq(self):
         """
@@ -500,17 +493,16 @@
                    None, final=True)
             self.checkdecodeerror(seq, 'utf-8', 0, len(seq)-1, addstuff=False,
                                   msg='invalid continuation byte')
-            assert self.decoder(seq, len(seq), None, final=True,
-                       errorhandler=self.replace_handler) == (res, len(seq))
-            assert (self.decoder('aaaa' + seq + 'bbbb', len(seq) + 8, None,
-                        final=True, errorhandler=self.replace_handler) ==
+            assert self.decoder(seq, len(seq), 'replace', final=True
+                                ) == (res, len(seq))
+            assert (self.decoder('aaaa' + seq + 'bbbb', len(seq) + 8, 
+                                 'replace', final=True) ==
                         (u'aaaa' + res + u'bbbb', len(seq) + 8))
             res = res.replace(FFFD, u'')
-            assert self.decoder(seq, len(seq), None, final=True,
-                           errorhandler=self.ignore_handler) == (res, len(seq))
-            assert (self.decoder('aaaa' + seq + 'bbbb', len(seq) + 8, None,
-                        final=True, errorhandler=self.ignore_handler) ==
-                        (u'aaaa' + res + u'bbbb', len(seq) + 8))
+            assert self.decoder(seq, len(seq), 'ignore', final=True
+                                ) == (res, len(seq))
+            assert (self.decoder('aaaa' + seq + 'bbbb', len(seq) + 8, 'ignore',
+                        final=True) == (u'aaaa' + res + u'bbbb', len(seq) + 8))
 
     def test_utf8_errors(self):
         # unexpected end of data
@@ -628,22 +620,15 @@
         for n, (seq, res) in enumerate(sequences):
             decoder = self.getdecoder('utf-8')
             raises(UnicodeDecodeError, decoder, seq, len(seq), None, 
final=True)
-            assert decoder(seq, len(seq), None, final=True,
-                           errorhandler=self.replace_handler) == (res, 
len(seq))
-            assert decoder(seq + 'b', len(seq) + 1, None, final=True,
-                           errorhandler=self.replace_handler) == (res + u'b',
-                                                                  len(seq) + 1)
+            assert decoder(seq, len(seq), 'replace', final=True
+                           ) == (res, len(seq))
+            assert decoder(seq + 'b', len(seq) + 1, 'replace', final=True
+                           ) == (res + u'b', len(seq) + 1)
             res = res.replace(FFFD, u'')
-            assert decoder(seq, len(seq), None, final=True,
-                           errorhandler=self.ignore_handler) == (res, len(seq))
+            assert decoder(seq, len(seq), 'ignore', final=True
+                           ) == (res, len(seq))
 
 class TestEncoding(UnicodeTests):
-    def replace_handler(self, errors, codec, message, input, start, end):
-        if errors=='strict':
-            runicode.raise_unicode_exception_encode(errors, codec, message,
-                                                    input, start, end)
-        return u'?', end
-
     def test_all_ascii(self):
         for i in range(128):
             if sys.version >= "2.7":
@@ -723,7 +708,7 @@
         encoder = self.getencoder('decimal')
         assert encoder(u' 12, 34 ', 8, None) == ' 12, 34 '
         raises(UnicodeEncodeError, encoder, u' 12, \u1234 ', 7, None)
-        assert encoder(u'u\u1234', 2, 'replace', self.replace_handler) == 'u?'
+        assert encoder(u'u\u1234', 2, 'replace') == 'u?'
 
 class TestTranslation(object):
     def setup_class(cls):
_______________________________________________
pypy-commit mailing list
pypy-commit@python.org
http://mail.python.org/mailman/listinfo/pypy-commit

Reply via email to