Author: cito
Date: Sun Nov 22 08:59:01 2015
New Revision: 599
Log:
Fix a few Python 3 issues with the main tests
Modified:
trunk/module/TEST_PyGreSQL_classic.py
trunk/module/TEST_PyGreSQL_dbapi20.py
trunk/module/dbapi20.py
trunk/module/pgdb.py
Modified: trunk/module/TEST_PyGreSQL_classic.py
==============================================================================
--- trunk/module/TEST_PyGreSQL_classic.py Sun Nov 22 08:36:55 2015
(r598)
+++ trunk/module/TEST_PyGreSQL_classic.py Sun Nov 22 08:59:01 2015
(r599)
@@ -20,6 +20,7 @@
except ImportError:
pass
+
def opendb():
db = DB(dbname, dbhost, dbport)
db.query("SET DATESTYLE TO 'ISO'")
@@ -35,10 +36,13 @@
"DROP SCHEMA _test1",
"DROP SCHEMA _test2",
):
- try: db.query(q)
- except: pass
+ try:
+ db.query(q)
+ except Exception:
+ pass
db.close()
+
class UtilityTest(unittest.TestCase):
def setUp(self):
@@ -258,7 +262,7 @@
thread = Thread(None, target)
thread.start()
# Wait until the thread has started.
- for n in xrange(500):
+ for n in range(500):
if target.listening:
break
sleep(0.01)
@@ -272,7 +276,7 @@
else:
db2.query("notify event_1, 'payload 1'")
# Wait until the notification has been caught.
- for n in xrange(500):
+ for n in range(500):
if arg_dict['called'] or self.notify_timeout:
break
sleep(0.01)
@@ -291,7 +295,7 @@
db2.query("notify stop_event_1, 'payload 2'")
db2.close()
# Wait until the notification has been caught.
- for n in xrange(500):
+ for n in range(500):
if arg_dict['called'] or self.notify_timeout:
break
sleep(0.01)
@@ -331,8 +335,10 @@
if __name__ == '__main__':
suite = unittest.TestSuite()
- if len(sys.argv) > 1: test_list = sys.argv[1:]
- else: test_list = unittest.getTestCaseNames(UtilityTest, 'test_')
+ if len(sys.argv) > 1:
+ test_list = sys.argv[1:]
+ else:
+ test_list = unittest.getTestCaseNames(UtilityTest, 'test_')
if len(sys.argv) == 2 and sys.argv[1] == '-l':
print('\n'.join(unittest.getTestCaseNames(UtilityTest, 'test_')))
@@ -341,10 +347,9 @@
for test_name in test_list:
try:
suite.addTest(UtilityTest(test_name))
- except:
+ except Exception:
print("\n ERROR: %s.\n" % sys.exc_value)
sys.exit(1)
rc = unittest.TextTestRunner(verbosity=1).run(suite)
- sys.exit(len(rc.errors+rc.failures) != 0)
-
+ sys.exit(1 if rc.errors or rc.failures else 0)
Modified: trunk/module/TEST_PyGreSQL_dbapi20.py
==============================================================================
--- trunk/module/TEST_PyGreSQL_dbapi20.py Sun Nov 22 08:36:55 2015
(r598)
+++ trunk/module/TEST_PyGreSQL_dbapi20.py Sun Nov 22 08:59:01 2015
(r599)
@@ -49,7 +49,7 @@
con = self._connect()
cur = myCursor(con)
ret = cur.execute("select 1 as a, 2 as b")
- self.assert_(ret is cur, 'execute() should return cursor')
+ self.assertTrue(ret is cur, 'execute() should return cursor')
self.assertEqual(cur.fetchone(), {'a': 1, 'b': 2})
def test_cursor_iteration(self):
@@ -102,15 +102,15 @@
try:
cur.execute("select 1/0")
except pgdb.DatabaseError as error:
- self.assert_(isinstance(error, pgdb.ProgrammingError))
+ self.assertTrue(isinstance(error, pgdb.ProgrammingError))
# the SQLSTATE error code for division by zero is 22012
self.assertEqual(error.sqlstate, '22012')
def test_float(self):
nan, inf = float('nan'), float('inf')
from math import isnan, isinf
- self.assert_(isnan(nan) and not isinf(nan))
- self.assert_(isinf(inf) and not isnan(inf))
+ self.assertTrue(isnan(nan) and not isinf(nan))
+ self.assertTrue(isinf(inf) and not isnan(inf))
values = [0, 1, 0.03125, -42.53125, nan, inf, -inf]
table = self.table_prefix + 'booze'
con = self._connect()
@@ -128,36 +128,36 @@
rows = [row[1] for row in rows]
for inval, outval in zip(values, rows):
if isinf(inval):
- self.assert_(isinf(outval))
+ self.assertTrue(isinf(outval))
if inval < 0:
- self.assert_(outval < 0)
+ self.assertTrue(outval < 0)
else:
- self.assert_(outval > 0)
+ self.assertTrue(outval > 0)
elif isnan(inval):
- self.assert_(isnan(outval))
+ self.assertTrue(isnan(outval))
else:
self.assertEqual(inval, outval)
def test_set_decimal_type(self):
decimal_type = pgdb.decimal_type()
- self.assert_(decimal_type is not None and callable(decimal_type))
+ self.assertTrue(decimal_type is not None and callable(decimal_type))
con = self._connect()
try:
cur = con.cursor()
- self.assert_(pgdb.decimal_type(int) is int)
+ self.assertTrue(pgdb.decimal_type(int) is int)
cur.execute('select 42')
value = cur.fetchone()[0]
- self.assert_(isinstance(value, int))
+ self.assertTrue(isinstance(value, int))
self.assertEqual(value, 42)
- self.assert_(pgdb.decimal_type(float) is float)
+ self.assertTrue(pgdb.decimal_type(float) is float)
cur.execute('select 4.25')
value = cur.fetchone()[0]
- self.assert_(isinstance(value, float))
+ self.assertTrue(isinstance(value, float))
self.assertEqual(value, 4.25)
finally:
con.close()
pgdb.decimal_type(decimal_type)
- self.assert_(pgdb.decimal_type() is decimal_type)
+ self.assertTrue(pgdb.decimal_type() is decimal_type)
def test_nextset(self):
con = self._connect()
@@ -247,11 +247,11 @@
self.assertEqual('int8', pgdb.INTEGER)
self.assertNotEqual('int4', pgdb.LONG)
self.assertEqual('int8', pgdb.LONG)
- self.assert_('char' in pgdb.STRING)
- self.assert_(pgdb.NUMERIC <= pgdb.NUMBER)
- self.assert_(pgdb.NUMBER >= pgdb.INTEGER)
- self.assert_(pgdb.TIME <= pgdb.DATETIME)
- self.assert_(pgdb.DATETIME >= pgdb.DATE)
+ self.assertTrue('char' in pgdb.STRING)
+ self.assertTrue(pgdb.NUMERIC <= pgdb.NUMBER)
+ self.assertTrue(pgdb.NUMBER >= pgdb.INTEGER)
+ self.assertTrue(pgdb.TIME <= pgdb.DATETIME)
+ self.assertTrue(pgdb.DATETIME >= pgdb.DATE)
if __name__ == '__main__':
Modified: trunk/module/dbapi20.py
==============================================================================
--- trunk/module/dbapi20.py Sun Nov 22 08:36:55 2015 (r598)
+++ trunk/module/dbapi20.py Sun Nov 22 08:59:01 2015 (r599)
@@ -159,7 +159,7 @@
# Must exist
threadsafety = self.driver.threadsafety
# Must be a valid value
- self.failUnless(threadsafety in (0,1,2,3))
+ self.assertTrue(threadsafety in (0,1,2,3))
except AttributeError:
self.fail("Driver doesn't define threadsafety")
@@ -168,7 +168,7 @@
# Must exist
paramstyle = self.driver.paramstyle
# Must be a valid value
- self.failUnless(paramstyle in (
+ self.assertTrue(paramstyle in (
'qmark','numeric','named','format','pyformat'
))
except AttributeError:
@@ -178,27 +178,27 @@
"""Make sure required exceptions exist, and are in the
defined hierarchy.
"""
- self.failUnless(issubclass(self.driver.Warning,Exception))
- self.failUnless(issubclass(self.driver.Error,Exception))
- self.failUnless(
+ self.assertTrue(issubclass(self.driver.Warning,Exception))
+ self.assertTrue(issubclass(self.driver.Error,Exception))
+ self.assertTrue(
issubclass(self.driver.InterfaceError,self.driver.Error)
)
- self.failUnless(
+ self.assertTrue(
issubclass(self.driver.DatabaseError,self.driver.Error)
)
- self.failUnless(
+ self.assertTrue(
issubclass(self.driver.OperationalError,self.driver.Error)
)
- self.failUnless(
+ self.assertTrue(
issubclass(self.driver.IntegrityError,self.driver.Error)
)
- self.failUnless(
+ self.assertTrue(
issubclass(self.driver.InternalError,self.driver.Error)
)
- self.failUnless(
+ self.assertTrue(
issubclass(self.driver.ProgrammingError,self.driver.Error)
)
- self.failUnless(
+ self.assertTrue(
issubclass(self.driver.NotSupportedError,self.driver.Error)
)
@@ -213,15 +213,15 @@
"""
con = self._connect()
drv = self.driver
- self.failUnless(con.Warning is drv.Warning)
- self.failUnless(con.Error is drv.Error)
- self.failUnless(con.InterfaceError is drv.InterfaceError)
- self.failUnless(con.DatabaseError is drv.DatabaseError)
- self.failUnless(con.OperationalError is drv.OperationalError)
- self.failUnless(con.IntegrityError is drv.IntegrityError)
- self.failUnless(con.InternalError is drv.InternalError)
- self.failUnless(con.ProgrammingError is drv.ProgrammingError)
- self.failUnless(con.NotSupportedError is drv.NotSupportedError)
+ self.assertTrue(con.Warning is drv.Warning)
+ self.assertTrue(con.Error is drv.Error)
+ self.assertTrue(con.InterfaceError is drv.InterfaceError)
+ self.assertTrue(con.DatabaseError is drv.DatabaseError)
+ self.assertTrue(con.OperationalError is drv.OperationalError)
+ self.assertTrue(con.IntegrityError is drv.IntegrityError)
+ self.assertTrue(con.InternalError is drv.InternalError)
+ self.assertTrue(con.ProgrammingError is drv.ProgrammingError)
+ self.assertTrue(con.NotSupportedError is drv.NotSupportedError)
def test_commit(self):
con = self._connect()
@@ -312,12 +312,12 @@
cur.execute("insert into %sbooze values ('Victoria Bitter')" % (
self.table_prefix
))
- self.failUnless(cur.rowcount in (-1,1),
+ self.assertTrue(cur.rowcount in (-1,1),
'cursor.rowcount should == number or rows inserted, or '
'set to -1 after executing an insert statement'
)
cur.execute("select name from %sbooze" % self.table_prefix)
- self.failUnless(cur.rowcount in (-1,1),
+ self.assertTrue(cur.rowcount in (-1,1),
'cursor.rowcount should == number of rows returned, or '
'set to -1 after executing a select statement'
)
@@ -380,7 +380,7 @@
cur.execute("insert into %sbooze values ('Victoria Bitter')" % (
self.table_prefix
))
- self.failUnless(cur.rowcount in (-1,1))
+ self.assertTrue(cur.rowcount in (-1,1))
if self.driver.paramstyle == 'qmark':
cur.execute(
@@ -409,7 +409,7 @@
)
else:
self.fail('Invalid paramstyle')
- self.failUnless(cur.rowcount in (-1,1))
+ self.assertTrue(cur.rowcount in (-1,1))
cur.execute('select name from %sbooze' % self.table_prefix)
res = cur.fetchall()
@@ -461,7 +461,7 @@
)
else:
self.fail('Unknown paramstyle')
- self.failUnless(cur.rowcount in (-1,2),
+ self.assertTrue(cur.rowcount in (-1,2),
'insert using cursor.executemany set cursor.rowcount to '
'incorrect value %r' % cur.rowcount
)
@@ -496,7 +496,7 @@
'cursor.fetchone should return None if a query retrieves '
'no rows'
)
- self.failUnless(cur.rowcount in (-1,0))
+ self.assertTrue(cur.rowcount in (-1,0))
# cursor.fetchone should raise an Error if called after
# executing a query that cannnot return rows
@@ -516,7 +516,7 @@
self.assertEqual(cur.fetchone(),None,
'cursor.fetchone should return None if no more rows available'
)
- self.failUnless(cur.rowcount in (-1,1))
+ self.assertTrue(cur.rowcount in (-1,1))
finally:
con.close()
@@ -572,7 +572,7 @@
'cursor.fetchmany should return an empty sequence after '
'results are exhausted'
)
- self.failUnless(cur.rowcount in (-1,6))
+ self.assertTrue(cur.rowcount in (-1,6))
# Same as above, using cursor.arraysize
cur.arraysize=4
@@ -585,12 +585,12 @@
self.assertEqual(len(r),2)
r = cur.fetchmany() # Should be an empty sequence
self.assertEqual(len(r),0)
- self.failUnless(cur.rowcount in (-1,6))
+ self.assertTrue(cur.rowcount in (-1,6))
cur.arraysize=6
cur.execute('select name from %sbooze' % self.table_prefix)
rows = cur.fetchmany() # Should get all rows
- self.failUnless(cur.rowcount in (-1,6))
+ self.assertTrue(cur.rowcount in (-1,6))
self.assertEqual(len(rows),6)
self.assertEqual(len(rows),6)
rows = [r[0] for r in rows]
@@ -607,7 +607,7 @@
'cursor.fetchmany should return an empty sequence if '
'called after the whole result set has been fetched'
)
- self.failUnless(cur.rowcount in (-1,6))
+ self.assertTrue(cur.rowcount in (-1,6))
self.executeDDL2(cur)
cur.execute('select name from %sbarflys' % self.table_prefix)
@@ -616,7 +616,7 @@
'cursor.fetchmany should return an empty sequence if '
'query retrieved no rows'
)
- self.failUnless(cur.rowcount in (-1,0))
+ self.assertTrue(cur.rowcount in (-1,0))
finally:
con.close()
@@ -640,7 +640,7 @@
cur.execute('select name from %sbooze' % self.table_prefix)
rows = cur.fetchall()
- self.failUnless(cur.rowcount in (-1,len(self.samples)))
+ self.assertTrue(cur.rowcount in (-1,len(self.samples)))
self.assertEqual(len(rows),len(self.samples),
'cursor.fetchall did not retrieve all rows'
)
@@ -656,12 +656,12 @@
'cursor.fetchall should return an empty list if called '
'after the whole result set has been fetched'
)
- self.failUnless(cur.rowcount in (-1,len(self.samples)))
+ self.assertTrue(cur.rowcount in (-1,len(self.samples)))
self.executeDDL2(cur)
cur.execute('select name from %sbarflys' % self.table_prefix)
rows = cur.fetchall()
- self.failUnless(cur.rowcount in (-1,0))
+ self.assertTrue(cur.rowcount in (-1,0))
self.assertEqual(len(rows),0,
'cursor.fetchall should return an empty list if '
'a select query returns no rows'
@@ -683,7 +683,7 @@
rows23 = cur.fetchmany(2)
rows4 = cur.fetchone()
rows56 = cur.fetchall()
- self.failUnless(cur.rowcount in (-1,6))
+ self.assertTrue(cur.rowcount in (-1,6))
self.assertEqual(len(rows23),2,
'fetchmany returned incorrect number of rows'
)
@@ -762,7 +762,7 @@
con = self._connect()
try:
cur = con.cursor()
- self.failUnless(hasattr(cur,'arraysize'),
+ self.assertTrue(hasattr(cur,'arraysize'),
'cursor.arraysize must be defined'
)
finally:
@@ -831,27 +831,27 @@
b = self.driver.Binary('')
def test_STRING(self):
- self.failUnless(hasattr(self.driver,'STRING'),
+ self.assertTrue(hasattr(self.driver,'STRING'),
'module.STRING must be defined'
)
def test_BINARY(self):
- self.failUnless(hasattr(self.driver,'BINARY'),
+ self.assertTrue(hasattr(self.driver,'BINARY'),
'module.BINARY must be defined.'
)
def test_NUMBER(self):
- self.failUnless(hasattr(self.driver,'NUMBER'),
+ self.assertTrue(hasattr(self.driver,'NUMBER'),
'module.NUMBER must be defined.'
)
def test_DATETIME(self):
- self.failUnless(hasattr(self.driver,'DATETIME'),
+ self.assertTrue(hasattr(self.driver,'DATETIME'),
'module.DATETIME must be defined.'
)
def test_ROWID(self):
- self.failUnless(hasattr(self.driver,'ROWID'),
+ self.assertTrue(hasattr(self.driver,'ROWID'),
'module.ROWID must be defined.'
)
Modified: trunk/module/pgdb.py
==============================================================================
--- trunk/module/pgdb.py Sun Nov 22 08:36:55 2015 (r598)
+++ trunk/module/pgdb.py Sun Nov 22 08:59:01 2015 (r599)
@@ -70,6 +70,16 @@
from decimal import Decimal
from math import isnan, isinf
+try:
+ long
+except NameError: # Python >= 3.0
+ long = int
+
+try:
+ basestring
+except NameError: # Python >= 3.0
+ basestring = (str, bytes)
+
set_decimal(Decimal)
@@ -228,9 +238,9 @@
"""Quote value depending on its type."""
if isinstance(val, (datetime, date, time, timedelta)):
val = str(val)
- elif isinstance(val, unicode):
+ elif isinstance(val, str) and str is not bytes:
val = val.encode('utf8')
- if isinstance(val, str):
+ if isinstance(val, bytes):
if isinstance(val, Binary):
val = self._cnx.escape_bytea(val)
else:
@@ -594,16 +604,10 @@
"""
- if frozenset.__module__ == '__builtin__':
- def __new__(cls, values):
- if isinstance(values, basestring):
- values = values.split()
- return super(pgdbType, cls).__new__(cls, values)
- else: # Python < 2.4
- def __init__(self, values):
- if isinstance(values, basestring):
- values = values.split()
- super(pgdbType, self).__init__(values)
+ def __new__(cls, values):
+ if isinstance(values, basestring):
+ values = values.split()
+ return super(pgdbType, cls).__new__(cls, values)
def __eq__(self, other):
if isinstance(other, basestring):
_______________________________________________
PyGreSQL mailing list
[email protected]
https://mail.vex.net/mailman/listinfo.cgi/pygresql