Author: cito
Date: Mon Nov 23 14:15:43 2015
New Revision: 613

Log:
Add some more testing of unicode issues

Modified:
   trunk/module/TEST_PyGreSQL_classic_connection.py

Modified: trunk/module/TEST_PyGreSQL_classic_connection.py
==============================================================================
--- trunk/module/TEST_PyGreSQL_classic_connection.py    Mon Nov 23 14:14:24 
2015        (r612)
+++ trunk/module/TEST_PyGreSQL_classic_connection.py    Mon Nov 23 14:15:43 
2015        (r613)
@@ -27,6 +27,9 @@
 
 # We need a database to test against.  If LOCAL_PyGreSQL.py exists we will
 # get our information from that.  Otherwise we use the defaults.
+# These tests should be run with various PostgreSQL versions and databases
+# created with different encodings and locales.  Particularly, make sure the
+# tests are running against databases created with both SQL_ASCII and UTF8.
 dbname = 'unittest'
 dbhost = None
 dbport = 5432
@@ -41,6 +44,11 @@
 except NameError:  # Python >= 3.0
     long = int
 
+try:
+    unicode
+except NameError:  # Python >= 3.0
+    unicode = str
+
 unicode_strings = str is not bytes
 
 windows = os.name == 'nt'
@@ -540,7 +548,7 @@
 
     def setUp(self):
         self.c = connect()
-        self.c.query('set client_encoding = utf8')
+        self.c.query('set client_encoding=utf8')
 
     def tearDown(self):
         self.c.close()
@@ -613,13 +621,21 @@
 
     def testQueryWithUnicodeParams(self):
         query = self.c.query
-        query('set client_encoding = utf8')
+        try:
+            query('set client_encoding=utf8')
+            query("select 'wörld'").getresult()[0][0] == 'wörld'
+        except pg.ProgrammingError:
+            self.skipTest("database does not support utf8")
         self.assertEqual(query("select $1||', '||$2||'!'",
             ('Hello', u'wörld')).getresult(), [('Hello, wörld!',)])
-        self.assertEqual(query("select $1||', '||$2||'!'",
-            ('Hello', u'мир')).getresult(),
-            [('Hello, мир!',)])
-        query('set client_encoding = latin1')
+
+    def testQueryWithUnicodeParamsLatin1(self):
+        query = self.c.query
+        try:
+            query('set client_encoding=latin1')
+            query("select 'wörld'").getresult()[0][0] == 'wörld'
+        except pg.ProgrammingError:
+            self.skipTest("database does not support latin1")
         r = query("select $1||', '||$2||'!'", ('Hello', u'wörld')).getresult()
         if unicode_strings:
             self.assertEqual(r, [('Hello, wörld!',)])
@@ -627,25 +643,37 @@
             self.assertEqual(r, [(u'Hello, wörld!'.encode('latin1'),)])
         self.assertRaises(UnicodeError, query, "select $1||', '||$2||'!'",
             ('Hello', u'мир'))
-        query('set client_encoding = iso_8859_1')
-        r = query("select $1||', '||$2||'!'", ('Hello', u'wörld')).getresult()
+        query('set client_encoding=iso_8859_1')
+        r = query("select $1||', '||$2||'!'",
+            ('Hello', u'wörld')).getresult()
         if unicode_strings:
             self.assertEqual(r, [('Hello, wörld!',)])
         else:
             self.assertEqual(r, [(u'Hello, wörld!'.encode('latin1'),)])
         self.assertRaises(UnicodeError, query, "select $1||', '||$2||'!'",
             ('Hello', u'мир'))
-        query('set client_encoding = iso_8859_5')
+        query('set client_encoding=sql_ascii')
         self.assertRaises(UnicodeError, query, "select $1||', '||$2||'!'",
             ('Hello', u'wörld'))
-        r = query("select $1||', '||$2||'!'", ('Hello', u'мир')).getresult()
+
+    def testQueryWithUnicodeParamsCyrillic(self):
+        query = self.c.query
+        try:
+            query('set client_encoding=iso_8859_5')
+            query("select 'мир'").getresult()[0][0] == 'мир'
+        except pg.ProgrammingError:
+            self.skipTest("database does not support cyrillic")
+        self.assertRaises(UnicodeError, query, "select $1||', '||$2||'!'",
+            ('Hello', u'wörld'))
+        r = query("select $1||', '||$2||'!'",
+            ('Hello', u'мир')).getresult()
         if unicode_strings:
             self.assertEqual(r, [('Hello, мир!',)])
         else:
             self.assertEqual(r, [(u'Hello, мир!'.encode('cyrillic'),)])
-        query('set client_encoding = sql_ascii')
+        query('set client_encoding=sql_ascii')
         self.assertRaises(UnicodeError, query, "select $1||', '||$2||'!'",
-            ('Hello', u'wörld'))
+            ('Hello', u'мир!'))
 
     def testQueryWithMixedParams(self):
         self.assertEqual(self.c.query("select $1+2,$2||', world!'",
@@ -689,6 +717,11 @@
             "i2 smallint, i4 integer, i8 bigint, b boolean, dt date, ti time,"
             "d numeric, f4 real, f8 double precision, m money,"
             "c char(1), v4 varchar(4), c4 char(4), t text)")
+        # Check whether the test database uses SQL_ASCII - this means
+        # that it does not consider encoding when calculating lengths.
+        c.query("set client_encoding=utf8")
+        cls.has_encoding = c.query(
+            "select length('ä') - length('a')").getresult()[0][0] == 0
         c.close()
 
     @classmethod
@@ -717,7 +750,15 @@
         (2, 2, long(2), False, '1903-12-17', '11:22:00',
             2.345678, 2.25, 2.125, '2.75', 'y', 'q', 'ijk', 'mnop\nstux!')]
 
-    def get_back(self):
+    @classmethod
+    def db_len(cls, s, encoding):
+        if cls.has_encoding:
+            s = s if isinstance(s, unicode) else s.decode(encoding)
+        else:
+            s = s.encode(encoding) if isinstance(s, unicode) else s
+        return len(s)
+
+    def get_back(self, encoding='utf-8'):
         """Convert boolean and decimal values back."""
         data = []
         for row in self.c.query("select * from test order by 1").getresult():
@@ -751,13 +792,13 @@
                 row[9] = str(float(row[9]))
             if row[10] is not None:  # char(1)
                 self.assertIsInstance(row[10], str)
-                self.assertEqual(len(row[10]), 1)
+                self.assertEqual(self.db_len(row[10], encoding), 1)
             if row[11] is not None:  # varchar(4)
                 self.assertIsInstance(row[11], str)
-                self.assertLessEqual(len(row[11]), 4)
+                self.assertLessEqual(self.db_len(row[11], encoding), 4)
             if row[12] is not None:  # char(4)
                 self.assertIsInstance(row[12], str)
-                self.assertEqual(len(row[12]), 4)
+                self.assertEqual(self.db_len(row[12], encoding), 4)
                 row[12] = row[12].rstrip()
             if row[13] is not None:  # text
                 self.assertIsInstance(row[13], str)
@@ -767,18 +808,18 @@
 
     def testInserttable1Row(self):
         data = self.data[2:3]
-        self.c.inserttable("test", data)
+        self.c.inserttable('test', data)
         self.assertEqual(self.get_back(), data)
 
     def testInserttable4Rows(self):
         data = self.data
-        self.c.inserttable("test", data)
+        self.c.inserttable('test', data)
         self.assertEqual(self.get_back(), data)
 
     def testInserttableMultipleRows(self):
         num_rows = 100
         data = self.data[2:3] * num_rows
-        self.c.inserttable("test", data)
+        self.c.inserttable('test', data)
         r = self.c.query("select count(*) from test").getresult()[0][0]
         self.assertEqual(r, num_rows)
 
@@ -786,13 +827,13 @@
         num_rows = 10
         data = self.data[2:3]
         for _i in range(num_rows):
-            self.c.inserttable("test", data)
+            self.c.inserttable('test', data)
         r = self.c.query("select count(*) from test").getresult()[0][0]
         self.assertEqual(r, num_rows)
 
     def testInserttableNullValues(self):
         data = [(None,) * 14] * 100
-        self.c.inserttable("test", data)
+        self.c.inserttable('test', data)
         self.assertEqual(self.get_back(), data)
 
     def testInserttableMaxValues(self):
@@ -800,9 +841,101 @@
             True, '2999-12-31', '11:59:59', 1e99,
             1.0 + 1.0 / 32, 1.0 + 1.0 / 32, None,
             "1", "1234", "1234", "1234" * 100)]
-        self.c.inserttable("test", data)
+        self.c.inserttable('test', data)
+        self.assertEqual(self.get_back(), data)
+
+    def testInserttableByteValues(self):
+        try:
+            self.c.query("select '€', 'käse', 'сыр', 'pont-l''évêque'")
+        except pg.ProgrammingError:
+            self.skipTest("database does not support utf8")
+        # non-ascii chars do not fit in char(1) when there is no encoding
+        c = u'€' if self.has_encoding else u'$'
+        row_unicode = (0, 0, long(0), False, u'1970-01-01', u'00:00:00',
+            0.0, 0.0, 0.0, u'0.0',
+            c, u'bäd', u'bäd', u"käse сыр pont-l'évêque")
+        row_bytes = tuple(s.encode('utf-8')
+            if isinstance(s, unicode) else s for s in row_unicode)
+        data = [row_bytes] * 2
+        self.c.inserttable('test', data)
+        if unicode_strings:
+            data = [row_unicode] * 2
+        self.assertEqual(self.get_back(), data)
+
+    def testInserttableUnicodeUtf8(self):
+        try:
+            self.c.query("select '€', 'käse', 'сыр', 'pont-l''évêque'")
+        except pg.ProgrammingError:
+            self.skipTest("database does not support utf8")
+        # non-ascii chars do not fit in char(1) when there is no encoding
+        c = u'€' if self.has_encoding else u'$'
+        row_unicode = (0, 0, long(0), False, u'1970-01-01', u'00:00:00',
+            0.0, 0.0, 0.0, u'0.0',
+            c, u'bäd', u'bäd', u"käse сыр pont-l'évêque")
+        data = [row_unicode] * 2
+        self.c.inserttable('test', data)
+        if not unicode_strings:
+            row_bytes = tuple(s.encode('utf-8')
+                if isinstance(s, unicode) else s for s in row_unicode)
+            data = [row_bytes] * 2
         self.assertEqual(self.get_back(), data)
 
+    def testInserttableUnicodeLatin1(self):
+
+        try:
+            self.c.query("set client_encoding=latin1")
+            self.c.query("select '¥'")
+        except pg.ProgrammingError:
+            self.skipTest("database does not support latin1")
+        # non-ascii chars do not fit in char(1) when there is no encoding
+        c = u'€' if self.has_encoding else u'$'
+        row_unicode = (0, 0, long(0), False, u'1970-01-01', u'00:00:00',
+            0.0, 0.0, 0.0, u'0.0',
+            c, u'bäd', u'bäd', u"for käse and pont-l'évêque pay in €")
+        data = [row_unicode]
+        # cannot encode € sign with latin1 encoding
+        self.assertRaises(UnicodeEncodeError, self.c.inserttable, 'test', data)
+        row_unicode = tuple(s.replace(u'€', u'¥')
+            if isinstance(s, unicode) else s for s in row_unicode)
+        data = [row_unicode] * 2
+        self.c.inserttable('test', data)
+        if not unicode_strings:
+            row_bytes = tuple(s.encode('latin1')
+                if isinstance(s, unicode) else s for s in row_unicode)
+            data = [row_bytes] * 2
+        self.assertEqual(self.get_back('latin1'), data)
+
+    def testInserttableUnicodeLatin9(self):
+        try:
+            self.c.query("set client_encoding=latin9")
+            self.c.query("select '€'")
+        except pg.ProgrammingError:
+            self.skipTest("database does not support latin9")
+            return
+        # non-ascii chars do not fit in char(1) when there is no encoding
+        c = u'€' if self.has_encoding else u'$'
+        row_unicode = (0, 0, long(0), False, u'1970-01-01', u'00:00:00',
+            0.0, 0.0, 0.0, u'0.0',
+            c, u'bäd', u'bäd', u"for käse and pont-l'évêque pay in €")
+        data = [row_unicode] * 2
+        self.c.inserttable('test', data)
+        if not unicode_strings:
+            row_bytes = tuple(s.encode('latin9')
+                if isinstance(s, unicode) else s for s in row_unicode)
+            data = [row_bytes] * 2
+        self.assertEqual(self.get_back('latin9'), data)
+
+    def testInserttableNoEncoding(self):
+        self.c.query("set client_encoding=sql_ascii")
+        # non-ascii chars do not fit in char(1) when there is no encoding
+        c = u'€' if self.has_encoding else u'$'
+        row_unicode = (0, 0, long(0), False, u'1970-01-01', u'00:00:00',
+            0.0, 0.0, 0.0, u'0.0',
+            c, u'bäd', u'bäd', u"for käse and pont-l'évêque pay in €")
+        data = [row_unicode]
+        # cannot encode non-ascii unicode without a specific encoding
+        self.assertRaises(UnicodeEncodeError, self.c.inserttable, 'test', data)
+
 
 class TestDirectSocketAccess(unittest.TestCase):
     """"Test copy command with direct socket access."""
@@ -1025,9 +1158,15 @@
             else:
                 break
         else:
-            self.fail("Cannot set English money locale")
+            self.SkipTest("cannot set English money locale")
         pg.set_decimal_point('.')
-        r = query("select '34.25'::money").getresult()[0][0]
+        try:
+            r = query("select '34.25'::money")
+        except pg.ProgrammingError:
+            # this can happen if the currency signs cannot be
+            # converted using the encoding of the test database
+            self.skipTest('database does not support money')
+        r = r.getresult()[0][0]
         self.assertIsInstance(r, d)
         self.assertEqual(r, d('34.25'))
         pg.set_decimal_point(',')
@@ -1042,13 +1181,21 @@
             else:
                 break
         else:
-            self.fail("Cannot set English money locale")
+            self.SkipTest("cannot set German money locale")
         pg.set_decimal_point(',')
-        r = query("select '34,25'::money").getresult()[0][0]
+        try:
+            r = query("select '34,25'::money")
+        except pg.ProgrammingError:
+            self.skipTest('database does not support money')
+        r = r.getresult()[0][0]
         self.assertIsInstance(r, d)
         self.assertEqual(r, d('34.25'))
         pg.set_decimal_point('.')
-        r = query("select '34,25'::money").getresult()[0][0]
+        try:
+            r = query("select '34,25'::money")
+        except pg.ProgrammingError:
+            self.skipTest('database does not support money')
+        r = r.getresult()[0][0]
         self.assertNotEqual(r, d('34.25'))
         pg.set_decimal_point(point)
 
_______________________________________________
PyGreSQL mailing list
[email protected]
https://mail.vex.net/mailman/listinfo.cgi/pygresql

Reply via email to