Author: cito
Date: Sun Nov 22 08:59:01 2015
New Revision: 599

Log:
Fix a few Python 3 issues with the main tests

Modified:
   trunk/module/TEST_PyGreSQL_classic.py
   trunk/module/TEST_PyGreSQL_dbapi20.py
   trunk/module/dbapi20.py
   trunk/module/pgdb.py

Modified: trunk/module/TEST_PyGreSQL_classic.py
==============================================================================
--- trunk/module/TEST_PyGreSQL_classic.py       Sun Nov 22 08:36:55 2015        
(r598)
+++ trunk/module/TEST_PyGreSQL_classic.py       Sun Nov 22 08:59:01 2015        
(r599)
@@ -20,6 +20,7 @@
 except ImportError:
     pass
 
+
 def opendb():
     db = DB(dbname, dbhost, dbport)
     db.query("SET DATESTYLE TO 'ISO'")
@@ -35,10 +36,13 @@
     "DROP SCHEMA _test1",
     "DROP SCHEMA _test2",
 ):
-    try: db.query(q)
-    except: pass
+    try:
+        db.query(q)
+    except Exception:
+        pass
 db.close()
 
+
 class UtilityTest(unittest.TestCase):
 
     def setUp(self):
@@ -258,7 +262,7 @@
                 thread = Thread(None, target)
                 thread.start()
                 # Wait until the thread has started.
-                for n in xrange(500):
+                for n in range(500):
                     if target.listening:
                         break
                     sleep(0.01)
@@ -272,7 +276,7 @@
                 else:
                     db2.query("notify event_1, 'payload 1'")
                 # Wait until the notification has been caught.
-                for n in xrange(500):
+                for n in range(500):
                     if arg_dict['called'] or self.notify_timeout:
                         break
                     sleep(0.01)
@@ -291,7 +295,7 @@
                     db2.query("notify stop_event_1, 'payload 2'")
                 db2.close()
                 # Wait until the notification has been caught.
-                for n in xrange(500):
+                for n in range(500):
                     if arg_dict['called'] or self.notify_timeout:
                         break
                     sleep(0.01)
@@ -331,8 +335,10 @@
 if __name__ == '__main__':
     suite = unittest.TestSuite()
 
-    if len(sys.argv) > 1: test_list = sys.argv[1:]
-    else: test_list = unittest.getTestCaseNames(UtilityTest, 'test_')
+    if len(sys.argv) > 1:
+        test_list = sys.argv[1:]
+    else:
+        test_list = unittest.getTestCaseNames(UtilityTest, 'test_')
 
     if len(sys.argv) == 2 and sys.argv[1] == '-l':
         print('\n'.join(unittest.getTestCaseNames(UtilityTest, 'test_')))
@@ -341,10 +347,9 @@
     for test_name in test_list:
         try:
             suite.addTest(UtilityTest(test_name))
-        except:
+        except Exception:
             print("\n ERROR: %s.\n" % sys.exc_value)
             sys.exit(1)
 
     rc = unittest.TextTestRunner(verbosity=1).run(suite)
-    sys.exit(len(rc.errors+rc.failures) != 0)
-
+    sys.exit(1 if rc.errors or rc.failures else 0)

Modified: trunk/module/TEST_PyGreSQL_dbapi20.py
==============================================================================
--- trunk/module/TEST_PyGreSQL_dbapi20.py       Sun Nov 22 08:36:55 2015        
(r598)
+++ trunk/module/TEST_PyGreSQL_dbapi20.py       Sun Nov 22 08:59:01 2015        
(r599)
@@ -49,7 +49,7 @@
         con = self._connect()
         cur = myCursor(con)
         ret = cur.execute("select 1 as a, 2 as b")
-        self.assert_(ret is cur, 'execute() should return cursor')
+        self.assertTrue(ret is cur, 'execute() should return cursor')
         self.assertEqual(cur.fetchone(), {'a': 1, 'b': 2})
 
     def test_cursor_iteration(self):
@@ -102,15 +102,15 @@
         try:
             cur.execute("select 1/0")
         except pgdb.DatabaseError as error:
-            self.assert_(isinstance(error, pgdb.ProgrammingError))
+            self.assertTrue(isinstance(error, pgdb.ProgrammingError))
             # the SQLSTATE error code for division by zero is 22012
             self.assertEqual(error.sqlstate, '22012')
 
     def test_float(self):
         nan, inf = float('nan'), float('inf')
         from math import isnan, isinf
-        self.assert_(isnan(nan) and not isinf(nan))
-        self.assert_(isinf(inf) and not isnan(inf))
+        self.assertTrue(isnan(nan) and not isinf(nan))
+        self.assertTrue(isinf(inf) and not isnan(inf))
         values = [0, 1, 0.03125, -42.53125, nan, inf, -inf]
         table = self.table_prefix + 'booze'
         con = self._connect()
@@ -128,36 +128,36 @@
         rows = [row[1] for row in rows]
         for inval, outval in zip(values, rows):
             if isinf(inval):
-                self.assert_(isinf(outval))
+                self.assertTrue(isinf(outval))
                 if inval < 0:
-                    self.assert_(outval < 0)
+                    self.assertTrue(outval < 0)
                 else:
-                    self.assert_(outval > 0)
+                    self.assertTrue(outval > 0)
             elif isnan(inval):
-                self.assert_(isnan(outval))
+                self.assertTrue(isnan(outval))
             else:
                 self.assertEqual(inval, outval)
 
     def test_set_decimal_type(self):
         decimal_type = pgdb.decimal_type()
-        self.assert_(decimal_type is not None and callable(decimal_type))
+        self.assertTrue(decimal_type is not None and callable(decimal_type))
         con = self._connect()
         try:
             cur = con.cursor()
-            self.assert_(pgdb.decimal_type(int) is int)
+            self.assertTrue(pgdb.decimal_type(int) is int)
             cur.execute('select 42')
             value = cur.fetchone()[0]
-            self.assert_(isinstance(value, int))
+            self.assertTrue(isinstance(value, int))
             self.assertEqual(value, 42)
-            self.assert_(pgdb.decimal_type(float) is float)
+            self.assertTrue(pgdb.decimal_type(float) is float)
             cur.execute('select 4.25')
             value = cur.fetchone()[0]
-            self.assert_(isinstance(value, float))
+            self.assertTrue(isinstance(value, float))
             self.assertEqual(value, 4.25)
         finally:
             con.close()
             pgdb.decimal_type(decimal_type)
-        self.assert_(pgdb.decimal_type() is decimal_type)
+        self.assertTrue(pgdb.decimal_type() is decimal_type)
 
     def test_nextset(self):
         con = self._connect()
@@ -247,11 +247,11 @@
         self.assertEqual('int8', pgdb.INTEGER)
         self.assertNotEqual('int4', pgdb.LONG)
         self.assertEqual('int8', pgdb.LONG)
-        self.assert_('char' in pgdb.STRING)
-        self.assert_(pgdb.NUMERIC <= pgdb.NUMBER)
-        self.assert_(pgdb.NUMBER >= pgdb.INTEGER)
-        self.assert_(pgdb.TIME <= pgdb.DATETIME)
-        self.assert_(pgdb.DATETIME >= pgdb.DATE)
+        self.assertTrue('char' in pgdb.STRING)
+        self.assertTrue(pgdb.NUMERIC <= pgdb.NUMBER)
+        self.assertTrue(pgdb.NUMBER >= pgdb.INTEGER)
+        self.assertTrue(pgdb.TIME <= pgdb.DATETIME)
+        self.assertTrue(pgdb.DATETIME >= pgdb.DATE)
 
 
 if __name__ == '__main__':

Modified: trunk/module/dbapi20.py
==============================================================================
--- trunk/module/dbapi20.py     Sun Nov 22 08:36:55 2015        (r598)
+++ trunk/module/dbapi20.py     Sun Nov 22 08:59:01 2015        (r599)
@@ -159,7 +159,7 @@
             # Must exist
             threadsafety = self.driver.threadsafety
             # Must be a valid value
-            self.failUnless(threadsafety in (0,1,2,3))
+            self.assertTrue(threadsafety in (0,1,2,3))
         except AttributeError:
             self.fail("Driver doesn't define threadsafety")
 
@@ -168,7 +168,7 @@
             # Must exist
             paramstyle = self.driver.paramstyle
             # Must be a valid value
-            self.failUnless(paramstyle in (
+            self.assertTrue(paramstyle in (
                 'qmark','numeric','named','format','pyformat'
                 ))
         except AttributeError:
@@ -178,27 +178,27 @@
         """Make sure required exceptions exist, and are in the
         defined hierarchy.
         """
-        self.failUnless(issubclass(self.driver.Warning,Exception))
-        self.failUnless(issubclass(self.driver.Error,Exception))
-        self.failUnless(
+        self.assertTrue(issubclass(self.driver.Warning,Exception))
+        self.assertTrue(issubclass(self.driver.Error,Exception))
+        self.assertTrue(
             issubclass(self.driver.InterfaceError,self.driver.Error)
             )
-        self.failUnless(
+        self.assertTrue(
             issubclass(self.driver.DatabaseError,self.driver.Error)
             )
-        self.failUnless(
+        self.assertTrue(
             issubclass(self.driver.OperationalError,self.driver.Error)
             )
-        self.failUnless(
+        self.assertTrue(
             issubclass(self.driver.IntegrityError,self.driver.Error)
             )
-        self.failUnless(
+        self.assertTrue(
             issubclass(self.driver.InternalError,self.driver.Error)
             )
-        self.failUnless(
+        self.assertTrue(
             issubclass(self.driver.ProgrammingError,self.driver.Error)
             )
-        self.failUnless(
+        self.assertTrue(
             issubclass(self.driver.NotSupportedError,self.driver.Error)
             )
 
@@ -213,15 +213,15 @@
         """
         con = self._connect()
         drv = self.driver
-        self.failUnless(con.Warning is drv.Warning)
-        self.failUnless(con.Error is drv.Error)
-        self.failUnless(con.InterfaceError is drv.InterfaceError)
-        self.failUnless(con.DatabaseError is drv.DatabaseError)
-        self.failUnless(con.OperationalError is drv.OperationalError)
-        self.failUnless(con.IntegrityError is drv.IntegrityError)
-        self.failUnless(con.InternalError is drv.InternalError)
-        self.failUnless(con.ProgrammingError is drv.ProgrammingError)
-        self.failUnless(con.NotSupportedError is drv.NotSupportedError)
+        self.assertTrue(con.Warning is drv.Warning)
+        self.assertTrue(con.Error is drv.Error)
+        self.assertTrue(con.InterfaceError is drv.InterfaceError)
+        self.assertTrue(con.DatabaseError is drv.DatabaseError)
+        self.assertTrue(con.OperationalError is drv.OperationalError)
+        self.assertTrue(con.IntegrityError is drv.IntegrityError)
+        self.assertTrue(con.InternalError is drv.InternalError)
+        self.assertTrue(con.ProgrammingError is drv.ProgrammingError)
+        self.assertTrue(con.NotSupportedError is drv.NotSupportedError)
 
     def test_commit(self):
         con = self._connect()
@@ -312,12 +312,12 @@
             cur.execute("insert into %sbooze values ('Victoria Bitter')" % (
                 self.table_prefix
                 ))
-            self.failUnless(cur.rowcount in (-1,1),
+            self.assertTrue(cur.rowcount in (-1,1),
                 'cursor.rowcount should == number or rows inserted, or '
                 'set to -1 after executing an insert statement'
                 )
             cur.execute("select name from %sbooze" % self.table_prefix)
-            self.failUnless(cur.rowcount in (-1,1),
+            self.assertTrue(cur.rowcount in (-1,1),
                 'cursor.rowcount should == number of rows returned, or '
                 'set to -1 after executing a select statement'
                 )
@@ -380,7 +380,7 @@
         cur.execute("insert into %sbooze values ('Victoria Bitter')" % (
             self.table_prefix
             ))
-        self.failUnless(cur.rowcount in (-1,1))
+        self.assertTrue(cur.rowcount in (-1,1))
 
         if self.driver.paramstyle == 'qmark':
             cur.execute(
@@ -409,7 +409,7 @@
                 )
         else:
             self.fail('Invalid paramstyle')
-        self.failUnless(cur.rowcount in (-1,1))
+        self.assertTrue(cur.rowcount in (-1,1))
 
         cur.execute('select name from %sbooze' % self.table_prefix)
         res = cur.fetchall()
@@ -461,7 +461,7 @@
                     )
             else:
                 self.fail('Unknown paramstyle')
-            self.failUnless(cur.rowcount in (-1,2),
+            self.assertTrue(cur.rowcount in (-1,2),
                 'insert using cursor.executemany set cursor.rowcount to '
                 'incorrect value %r' % cur.rowcount
                 )
@@ -496,7 +496,7 @@
                 'cursor.fetchone should return None if a query retrieves '
                 'no rows'
                 )
-            self.failUnless(cur.rowcount in (-1,0))
+            self.assertTrue(cur.rowcount in (-1,0))
 
             # cursor.fetchone should raise an Error if called after
             # executing a query that cannnot return rows
@@ -516,7 +516,7 @@
             self.assertEqual(cur.fetchone(),None,
                 'cursor.fetchone should return None if no more rows available'
                 )
-            self.failUnless(cur.rowcount in (-1,1))
+            self.assertTrue(cur.rowcount in (-1,1))
         finally:
             con.close()
 
@@ -572,7 +572,7 @@
                 'cursor.fetchmany should return an empty sequence after '
                 'results are exhausted'
             )
-            self.failUnless(cur.rowcount in (-1,6))
+            self.assertTrue(cur.rowcount in (-1,6))
 
             # Same as above, using cursor.arraysize
             cur.arraysize=4
@@ -585,12 +585,12 @@
             self.assertEqual(len(r),2)
             r = cur.fetchmany() # Should be an empty sequence
             self.assertEqual(len(r),0)
-            self.failUnless(cur.rowcount in (-1,6))
+            self.assertTrue(cur.rowcount in (-1,6))
 
             cur.arraysize=6
             cur.execute('select name from %sbooze' % self.table_prefix)
             rows = cur.fetchmany() # Should get all rows
-            self.failUnless(cur.rowcount in (-1,6))
+            self.assertTrue(cur.rowcount in (-1,6))
             self.assertEqual(len(rows),6)
             self.assertEqual(len(rows),6)
             rows = [r[0] for r in rows]
@@ -607,7 +607,7 @@
                 'cursor.fetchmany should return an empty sequence if '
                 'called after the whole result set has been fetched'
                 )
-            self.failUnless(cur.rowcount in (-1,6))
+            self.assertTrue(cur.rowcount in (-1,6))
 
             self.executeDDL2(cur)
             cur.execute('select name from %sbarflys' % self.table_prefix)
@@ -616,7 +616,7 @@
                 'cursor.fetchmany should return an empty sequence if '
                 'query retrieved no rows'
                 )
-            self.failUnless(cur.rowcount in (-1,0))
+            self.assertTrue(cur.rowcount in (-1,0))
 
         finally:
             con.close()
@@ -640,7 +640,7 @@
 
             cur.execute('select name from %sbooze' % self.table_prefix)
             rows = cur.fetchall()
-            self.failUnless(cur.rowcount in (-1,len(self.samples)))
+            self.assertTrue(cur.rowcount in (-1,len(self.samples)))
             self.assertEqual(len(rows),len(self.samples),
                 'cursor.fetchall did not retrieve all rows'
                 )
@@ -656,12 +656,12 @@
                 'cursor.fetchall should return an empty list if called '
                 'after the whole result set has been fetched'
                 )
-            self.failUnless(cur.rowcount in (-1,len(self.samples)))
+            self.assertTrue(cur.rowcount in (-1,len(self.samples)))
 
             self.executeDDL2(cur)
             cur.execute('select name from %sbarflys' % self.table_prefix)
             rows = cur.fetchall()
-            self.failUnless(cur.rowcount in (-1,0))
+            self.assertTrue(cur.rowcount in (-1,0))
             self.assertEqual(len(rows),0,
                 'cursor.fetchall should return an empty list if '
                 'a select query returns no rows'
@@ -683,7 +683,7 @@
             rows23 = cur.fetchmany(2)
             rows4  = cur.fetchone()
             rows56 = cur.fetchall()
-            self.failUnless(cur.rowcount in (-1,6))
+            self.assertTrue(cur.rowcount in (-1,6))
             self.assertEqual(len(rows23),2,
                 'fetchmany returned incorrect number of rows'
                 )
@@ -762,7 +762,7 @@
         con = self._connect()
         try:
             cur = con.cursor()
-            self.failUnless(hasattr(cur,'arraysize'),
+            self.assertTrue(hasattr(cur,'arraysize'),
                 'cursor.arraysize must be defined'
                 )
         finally:
@@ -831,27 +831,27 @@
         b = self.driver.Binary('')
 
     def test_STRING(self):
-        self.failUnless(hasattr(self.driver,'STRING'),
+        self.assertTrue(hasattr(self.driver,'STRING'),
             'module.STRING must be defined'
             )
 
     def test_BINARY(self):
-        self.failUnless(hasattr(self.driver,'BINARY'),
+        self.assertTrue(hasattr(self.driver,'BINARY'),
             'module.BINARY must be defined.'
             )
 
     def test_NUMBER(self):
-        self.failUnless(hasattr(self.driver,'NUMBER'),
+        self.assertTrue(hasattr(self.driver,'NUMBER'),
             'module.NUMBER must be defined.'
             )
 
     def test_DATETIME(self):
-        self.failUnless(hasattr(self.driver,'DATETIME'),
+        self.assertTrue(hasattr(self.driver,'DATETIME'),
             'module.DATETIME must be defined.'
             )
 
     def test_ROWID(self):
-        self.failUnless(hasattr(self.driver,'ROWID'),
+        self.assertTrue(hasattr(self.driver,'ROWID'),
             'module.ROWID must be defined.'
             )
 

Modified: trunk/module/pgdb.py
==============================================================================
--- trunk/module/pgdb.py        Sun Nov 22 08:36:55 2015        (r598)
+++ trunk/module/pgdb.py        Sun Nov 22 08:59:01 2015        (r599)
@@ -70,6 +70,16 @@
 from decimal import Decimal
 from math import isnan, isinf
 
+try:
+    long
+except NameError:  # Python >= 3.0
+    long = int
+
+try:
+    basestring
+except NameError:  # Python >= 3.0
+    basestring = (str, bytes)
+
 set_decimal(Decimal)
 
 
@@ -228,9 +238,9 @@
         """Quote value depending on its type."""
         if isinstance(val, (datetime, date, time, timedelta)):
             val = str(val)
-        elif isinstance(val, unicode):
+        elif isinstance(val, str) and str is not bytes:
             val = val.encode('utf8')
-        if isinstance(val, str):
+        if isinstance(val, bytes):
             if isinstance(val, Binary):
                 val = self._cnx.escape_bytea(val)
             else:
@@ -594,16 +604,10 @@
 
     """
 
-    if frozenset.__module__ == '__builtin__':
-        def __new__(cls, values):
-            if isinstance(values, basestring):
-                values = values.split()
-            return super(pgdbType, cls).__new__(cls, values)
-    else:  # Python < 2.4
-        def __init__(self, values):
-            if isinstance(values, basestring):
-                values = values.split()
-            super(pgdbType, self).__init__(values)
+    def __new__(cls, values):
+        if isinstance(values, basestring):
+            values = values.split()
+        return super(pgdbType, cls).__new__(cls, values)
 
     def __eq__(self, other):
         if isinstance(other, basestring):
_______________________________________________
PyGreSQL mailing list
[email protected]
https://mail.vex.net/mailman/listinfo.cgi/pygresql

Reply via email to