Author: cito
Date: Mon Nov 23 14:15:43 2015
New Revision: 613
Log:
Add some more testing of unicode issues
Modified:
trunk/module/TEST_PyGreSQL_classic_connection.py
Modified: trunk/module/TEST_PyGreSQL_classic_connection.py
==============================================================================
--- trunk/module/TEST_PyGreSQL_classic_connection.py Mon Nov 23 14:14:24
2015 (r612)
+++ trunk/module/TEST_PyGreSQL_classic_connection.py Mon Nov 23 14:15:43
2015 (r613)
@@ -27,6 +27,9 @@
# We need a database to test against. If LOCAL_PyGreSQL.py exists we will
# get our information from that. Otherwise we use the defaults.
+# These tests should be run with various PostgreSQL versions and databases
+# created with different encodings and locales. Particularly, make sure the
+# tests are running against databases created with both SQL_ASCII and UTF8.
dbname = 'unittest'
dbhost = None
dbport = 5432
@@ -41,6 +44,11 @@
except NameError: # Python >= 3.0
long = int
+try:
+ unicode
+except NameError: # Python >= 3.0
+ unicode = str
+
unicode_strings = str is not bytes
windows = os.name == 'nt'
@@ -540,7 +548,7 @@
def setUp(self):
self.c = connect()
- self.c.query('set client_encoding = utf8')
+ self.c.query('set client_encoding=utf8')
def tearDown(self):
self.c.close()
@@ -613,13 +621,21 @@
def testQueryWithUnicodeParams(self):
query = self.c.query
- query('set client_encoding = utf8')
+ try:
+ query('set client_encoding=utf8')
+ query("select 'wörld'").getresult()[0][0] == 'wörld'
+ except pg.ProgrammingError:
+ self.skipTest("database does not support utf8")
self.assertEqual(query("select $1||', '||$2||'!'",
('Hello', u'wörld')).getresult(), [('Hello, wörld!',)])
- self.assertEqual(query("select $1||', '||$2||'!'",
- ('Hello', u'мир')).getresult(),
- [('Hello, мир!',)])
- query('set client_encoding = latin1')
+
+ def testQueryWithUnicodeParamsLatin1(self):
+ query = self.c.query
+ try:
+ query('set client_encoding=latin1')
+ query("select 'wörld'").getresult()[0][0] == 'wörld'
+ except pg.ProgrammingError:
+ self.skipTest("database does not support latin1")
r = query("select $1||', '||$2||'!'", ('Hello', u'wörld')).getresult()
if unicode_strings:
self.assertEqual(r, [('Hello, wörld!',)])
@@ -627,25 +643,37 @@
self.assertEqual(r, [(u'Hello, wörld!'.encode('latin1'),)])
self.assertRaises(UnicodeError, query, "select $1||', '||$2||'!'",
('Hello', u'мир'))
- query('set client_encoding = iso_8859_1')
- r = query("select $1||', '||$2||'!'", ('Hello', u'wörld')).getresult()
+ query('set client_encoding=iso_8859_1')
+ r = query("select $1||', '||$2||'!'",
+ ('Hello', u'wörld')).getresult()
if unicode_strings:
self.assertEqual(r, [('Hello, wörld!',)])
else:
self.assertEqual(r, [(u'Hello, wörld!'.encode('latin1'),)])
self.assertRaises(UnicodeError, query, "select $1||', '||$2||'!'",
('Hello', u'мир'))
- query('set client_encoding = iso_8859_5')
+ query('set client_encoding=sql_ascii')
self.assertRaises(UnicodeError, query, "select $1||', '||$2||'!'",
('Hello', u'wörld'))
- r = query("select $1||', '||$2||'!'", ('Hello', u'мир')).getresult()
+
+ def testQueryWithUnicodeParamsCyrillic(self):
+ query = self.c.query
+ try:
+ query('set client_encoding=iso_8859_5')
+ query("select 'мир'").getresult()[0][0] == 'мир'
+ except pg.ProgrammingError:
+ self.skipTest("database does not support cyrillic")
+ self.assertRaises(UnicodeError, query, "select $1||', '||$2||'!'",
+ ('Hello', u'wörld'))
+ r = query("select $1||', '||$2||'!'",
+ ('Hello', u'мир')).getresult()
if unicode_strings:
self.assertEqual(r, [('Hello, мир!',)])
else:
self.assertEqual(r, [(u'Hello, мир!'.encode('cyrillic'),)])
- query('set client_encoding = sql_ascii')
+ query('set client_encoding=sql_ascii')
self.assertRaises(UnicodeError, query, "select $1||', '||$2||'!'",
- ('Hello', u'wörld'))
+ ('Hello', u'мир!'))
def testQueryWithMixedParams(self):
self.assertEqual(self.c.query("select $1+2,$2||', world!'",
@@ -689,6 +717,11 @@
"i2 smallint, i4 integer, i8 bigint, b boolean, dt date, ti time,"
"d numeric, f4 real, f8 double precision, m money,"
"c char(1), v4 varchar(4), c4 char(4), t text)")
+ # Check whether the test database uses SQL_ASCII - this means
+ # that it does not consider encoding when calculating lengths.
+ c.query("set client_encoding=utf8")
+ cls.has_encoding = c.query(
+ "select length('ä') - length('a')").getresult()[0][0] == 0
c.close()
@classmethod
@@ -717,7 +750,15 @@
(2, 2, long(2), False, '1903-12-17', '11:22:00',
2.345678, 2.25, 2.125, '2.75', 'y', 'q', 'ijk', 'mnop\nstux!')]
- def get_back(self):
+ @classmethod
+ def db_len(cls, s, encoding):
+ if cls.has_encoding:
+ s = s if isinstance(s, unicode) else s.decode(encoding)
+ else:
+ s = s.encode(encoding) if isinstance(s, unicode) else s
+ return len(s)
+
+ def get_back(self, encoding='utf-8'):
"""Convert boolean and decimal values back."""
data = []
for row in self.c.query("select * from test order by 1").getresult():
@@ -751,13 +792,13 @@
row[9] = str(float(row[9]))
if row[10] is not None: # char(1)
self.assertIsInstance(row[10], str)
- self.assertEqual(len(row[10]), 1)
+ self.assertEqual(self.db_len(row[10], encoding), 1)
if row[11] is not None: # varchar(4)
self.assertIsInstance(row[11], str)
- self.assertLessEqual(len(row[11]), 4)
+ self.assertLessEqual(self.db_len(row[11], encoding), 4)
if row[12] is not None: # char(4)
self.assertIsInstance(row[12], str)
- self.assertEqual(len(row[12]), 4)
+ self.assertEqual(self.db_len(row[12], encoding), 4)
row[12] = row[12].rstrip()
if row[13] is not None: # text
self.assertIsInstance(row[13], str)
@@ -767,18 +808,18 @@
def testInserttable1Row(self):
data = self.data[2:3]
- self.c.inserttable("test", data)
+ self.c.inserttable('test', data)
self.assertEqual(self.get_back(), data)
def testInserttable4Rows(self):
data = self.data
- self.c.inserttable("test", data)
+ self.c.inserttable('test', data)
self.assertEqual(self.get_back(), data)
def testInserttableMultipleRows(self):
num_rows = 100
data = self.data[2:3] * num_rows
- self.c.inserttable("test", data)
+ self.c.inserttable('test', data)
r = self.c.query("select count(*) from test").getresult()[0][0]
self.assertEqual(r, num_rows)
@@ -786,13 +827,13 @@
num_rows = 10
data = self.data[2:3]
for _i in range(num_rows):
- self.c.inserttable("test", data)
+ self.c.inserttable('test', data)
r = self.c.query("select count(*) from test").getresult()[0][0]
self.assertEqual(r, num_rows)
def testInserttableNullValues(self):
data = [(None,) * 14] * 100
- self.c.inserttable("test", data)
+ self.c.inserttable('test', data)
self.assertEqual(self.get_back(), data)
def testInserttableMaxValues(self):
@@ -800,9 +841,101 @@
True, '2999-12-31', '11:59:59', 1e99,
1.0 + 1.0 / 32, 1.0 + 1.0 / 32, None,
"1", "1234", "1234", "1234" * 100)]
- self.c.inserttable("test", data)
+ self.c.inserttable('test', data)
+ self.assertEqual(self.get_back(), data)
+
+ def testInserttableByteValues(self):
+ try:
+ self.c.query("select '€', 'käse', 'сыр', 'pont-l''évêque'")
+ except pg.ProgrammingError:
+ self.skipTest("database does not support utf8")
+ # non-ascii chars do not fit in char(1) when there is no encoding
+ c = u'€' if self.has_encoding else u'$'
+ row_unicode = (0, 0, long(0), False, u'1970-01-01', u'00:00:00',
+ 0.0, 0.0, 0.0, u'0.0',
+ c, u'bäd', u'bäd', u"käse сыр pont-l'évêque")
+ row_bytes = tuple(s.encode('utf-8')
+ if isinstance(s, unicode) else s for s in row_unicode)
+ data = [row_bytes] * 2
+ self.c.inserttable('test', data)
+ if unicode_strings:
+ data = [row_unicode] * 2
+ self.assertEqual(self.get_back(), data)
+
+ def testInserttableUnicodeUtf8(self):
+ try:
+ self.c.query("select '€', 'käse', 'сыр', 'pont-l''évêque'")
+ except pg.ProgrammingError:
+ self.skipTest("database does not support utf8")
+ # non-ascii chars do not fit in char(1) when there is no encoding
+ c = u'€' if self.has_encoding else u'$'
+ row_unicode = (0, 0, long(0), False, u'1970-01-01', u'00:00:00',
+ 0.0, 0.0, 0.0, u'0.0',
+ c, u'bäd', u'bäd', u"käse сыр pont-l'évêque")
+ data = [row_unicode] * 2
+ self.c.inserttable('test', data)
+ if not unicode_strings:
+ row_bytes = tuple(s.encode('utf-8')
+ if isinstance(s, unicode) else s for s in row_unicode)
+ data = [row_bytes] * 2
self.assertEqual(self.get_back(), data)
+ def testInserttableUnicodeLatin1(self):
+
+ try:
+ self.c.query("set client_encoding=latin1")
+ self.c.query("select '¥'")
+ except pg.ProgrammingError:
+ self.skipTest("database does not support latin1")
+ # non-ascii chars do not fit in char(1) when there is no encoding
+ c = u'€' if self.has_encoding else u'$'
+ row_unicode = (0, 0, long(0), False, u'1970-01-01', u'00:00:00',
+ 0.0, 0.0, 0.0, u'0.0',
+ c, u'bäd', u'bäd', u"for käse and pont-l'évêque pay in €")
+ data = [row_unicode]
+ # cannot encode € sign with latin1 encoding
+ self.assertRaises(UnicodeEncodeError, self.c.inserttable, 'test', data)
+ row_unicode = tuple(s.replace(u'€', u'¥')
+ if isinstance(s, unicode) else s for s in row_unicode)
+ data = [row_unicode] * 2
+ self.c.inserttable('test', data)
+ if not unicode_strings:
+ row_bytes = tuple(s.encode('latin1')
+ if isinstance(s, unicode) else s for s in row_unicode)
+ data = [row_bytes] * 2
+ self.assertEqual(self.get_back('latin1'), data)
+
+ def testInserttableUnicodeLatin9(self):
+ try:
+ self.c.query("set client_encoding=latin9")
+ self.c.query("select '€'")
+ except pg.ProgrammingError:
+ self.skipTest("database does not support latin9")
+ return
+ # non-ascii chars do not fit in char(1) when there is no encoding
+ c = u'€' if self.has_encoding else u'$'
+ row_unicode = (0, 0, long(0), False, u'1970-01-01', u'00:00:00',
+ 0.0, 0.0, 0.0, u'0.0',
+ c, u'bäd', u'bäd', u"for käse and pont-l'évêque pay in €")
+ data = [row_unicode] * 2
+ self.c.inserttable('test', data)
+ if not unicode_strings:
+ row_bytes = tuple(s.encode('latin9')
+ if isinstance(s, unicode) else s for s in row_unicode)
+ data = [row_bytes] * 2
+ self.assertEqual(self.get_back('latin9'), data)
+
+ def testInserttableNoEncoding(self):
+ self.c.query("set client_encoding=sql_ascii")
+ # non-ascii chars do not fit in char(1) when there is no encoding
+ c = u'€' if self.has_encoding else u'$'
+ row_unicode = (0, 0, long(0), False, u'1970-01-01', u'00:00:00',
+ 0.0, 0.0, 0.0, u'0.0',
+ c, u'bäd', u'bäd', u"for käse and pont-l'évêque pay in €")
+ data = [row_unicode]
+ # cannot encode non-ascii unicode without a specific encoding
+ self.assertRaises(UnicodeEncodeError, self.c.inserttable, 'test', data)
+
class TestDirectSocketAccess(unittest.TestCase):
""""Test copy command with direct socket access."""
@@ -1025,9 +1158,15 @@
else:
break
else:
- self.fail("Cannot set English money locale")
+ self.SkipTest("cannot set English money locale")
pg.set_decimal_point('.')
- r = query("select '34.25'::money").getresult()[0][0]
+ try:
+ r = query("select '34.25'::money")
+ except pg.ProgrammingError:
+ # this can happen if the currency signs cannot be
+ # converted using the encoding of the test database
+ self.skipTest('database does not support money')
+ r = r.getresult()[0][0]
self.assertIsInstance(r, d)
self.assertEqual(r, d('34.25'))
pg.set_decimal_point(',')
@@ -1042,13 +1181,21 @@
else:
break
else:
- self.fail("Cannot set English money locale")
+ self.SkipTest("cannot set German money locale")
pg.set_decimal_point(',')
- r = query("select '34,25'::money").getresult()[0][0]
+ try:
+ r = query("select '34,25'::money")
+ except pg.ProgrammingError:
+ self.skipTest('database does not support money')
+ r = r.getresult()[0][0]
self.assertIsInstance(r, d)
self.assertEqual(r, d('34.25'))
pg.set_decimal_point('.')
- r = query("select '34,25'::money").getresult()[0][0]
+ try:
+ r = query("select '34,25'::money")
+ except pg.ProgrammingError:
+ self.skipTest('database does not support money')
+ r = r.getresult()[0][0]
self.assertNotEqual(r, d('34.25'))
pg.set_decimal_point(point)
_______________________________________________
PyGreSQL mailing list
[email protected]
https://mail.vex.net/mailman/listinfo.cgi/pygresql