Title: [1025] trunk/tests: Make tests run with PostgreSQL 12
Revision
1025
Author
cito
Date
2019-10-03 19:35:34 -0400 (Thu, 03 Oct 2019)

Log Message

Make tests run with PostgreSQL 12

Skip/adapt tests that use tables with oids

Modified Paths

Diff

Modified: trunk/tests/test_classic_connection.py (1024 => 1025)


--- trunk/tests/test_classic_connection.py	2019-10-03 19:43:29 UTC (rev 1024)
+++ trunk/tests/test_classic_connection.py	2019-10-03 23:35:34 UTC (rev 1025)
@@ -616,6 +616,44 @@
         query = self.c.query
         query("drop table if exists test_table")
         self.addCleanup(query, "drop table test_table")
+        q = "create table test_table (n integer)"
+        r = query(q)
+        self.assertIsNone(r)
+        q = "insert into test_table values (1)"
+        r = query(q)
+        self.assertIsInstance(r, str)
+        self.assertEqual(r, '1')
+        q = "insert into test_table select 2"
+        r = query(q)
+        self.assertIsInstance(r, str)
+        self.assertEqual(r, '1')
+        q = "select n from test_table where n>1"
+        r = query(q).getresult()
+        self.assertEqual(len(r), 1)
+        r = r[0]
+        self.assertEqual(len(r), 1)
+        r = r[0]
+        self.assertIsInstance(r, int)
+        self.assertEqual(r, 2)
+        q = "insert into test_table select 3 union select 4 union select 5"
+        r = query(q)
+        self.assertIsInstance(r, str)
+        self.assertEqual(r, '3')
+        q = "update test_table set n=4 where n<5"
+        r = query(q)
+        self.assertIsInstance(r, str)
+        self.assertEqual(r, '4')
+        q = "delete from test_table"
+        r = query(q)
+        self.assertIsInstance(r, str)
+        self.assertEqual(r, '5')
+
+    def testQueryWithOids(self):
+        if self.c.server_version >= 120000:
+            self.skipTest("database does not support tables with oids")
+        query = self.c.query
+        query("drop table if exists test_table")
+        self.addCleanup(query, "drop table test_table")
         q = "create table test_table (n integer) with oids"
         r = query(q)
         self.assertIsNone(r)

Modified: trunk/tests/test_classic_dbwrapper.py (1024 => 1025)


--- trunk/tests/test_classic_dbwrapper.py	2019-10-03 19:43:29 UTC (rev 1024)
+++ trunk/tests/test_classic_dbwrapper.py	2019-10-03 23:35:34 UTC (rev 1025)
@@ -448,6 +448,7 @@
     @classmethod
     def setUpClass(cls):
         db = DB()
+        cls.oids = db.server_version < 120000
         db.query("drop table if exists test cascade")
         db.query("create table test ("
                  "i2 smallint, i4 integer, i8 bigint,"
@@ -499,7 +500,8 @@
         as_query = definition.startswith(('as ', 'AS '))
         if not as_query and not definition.startswith('('):
             definition = '(%s)' % definition
-        with_oids = 'with oids' if oids else 'without oids'
+        with_oids = 'with oids' if oids else (
+            'without oids' if self.oids else '')
         q = ['create', temporary, table]
         if as_query:
             q.extend([with_oids, definition])
@@ -720,23 +722,23 @@
         f(['standard_conforming_strings', 'datestyle'], ['on', 'ISO, DMY'])
         self.assertEqual(g('standard_conforming_strings'), 'on')
         self.assertEqual(g('datestyle'), 'ISO, DMY')
-        f(['default_with_oids', 'standard_conforming_strings'], 'off')
-        self.assertEqual(g('default_with_oids'), 'off')
+        f(['escape_string_warning', 'standard_conforming_strings'], 'off')
+        self.assertEqual(g('escape_string_warning'), 'off')
         self.assertEqual(g('standard_conforming_strings'), 'off')
         f(('standard_conforming_strings', 'datestyle'), ('on', 'ISO, YMD'))
         self.assertEqual(g('standard_conforming_strings'), 'on')
         self.assertEqual(g('datestyle'), 'ISO, YMD')
-        f(('default_with_oids', 'standard_conforming_strings'), 'off')
-        self.assertEqual(g('default_with_oids'), 'off')
+        f(('escape_string_warning', 'standard_conforming_strings'), 'off')
+        self.assertEqual(g('escape_string_warning'), 'off')
         self.assertEqual(g('standard_conforming_strings'), 'off')
-        f(set(['default_with_oids', 'standard_conforming_strings']), 'on')
-        self.assertEqual(g('default_with_oids'), 'on')
+        f(set(['escape_string_warning', 'standard_conforming_strings']), 'on')
+        self.assertEqual(g('escape_string_warning'), 'on')
         self.assertEqual(g('standard_conforming_strings'), 'on')
-        self.assertRaises(ValueError, f, set(['default_with_oids',
+        self.assertRaises(ValueError, f, set(['escape_string_warning',
             'standard_conforming_strings']), ['off', 'on'])
-        f(set(['default_with_oids', 'standard_conforming_strings']),
+        f(set(['escape_string_warning', 'standard_conforming_strings']),
             ['off', 'off'])
-        self.assertEqual(g('default_with_oids'), 'off')
+        self.assertEqual(g('escape_string_warning'), 'off')
         self.assertEqual(g('standard_conforming_strings'), 'off')
         f({'standard_conforming_strings': 'on', 'datestyle': 'ISO, YMD'})
         self.assertEqual(g('standard_conforming_strings'), 'on')
@@ -746,40 +748,40 @@
         db = DB()
         f = db.set_parameter
         g = db.get_parameter
-        r = g('default_with_oids')
+        r = g('escape_string_warning')
         self.assertIn(r, ('on', 'off'))
-        dwi, not_dwi = r, 'off' if r == 'on' else 'on'
+        esw, not_esw = r, 'off' if r == 'on' else 'on'
         r = g('standard_conforming_strings')
         self.assertIn(r, ('on', 'off'))
         scs, not_scs = r, 'off' if r == 'on' else 'on'
-        f('default_with_oids', not_dwi)
+        f('escape_string_warning', not_esw)
         f('standard_conforming_strings', not_scs)
-        self.assertEqual(g('default_with_oids'), not_dwi)
+        self.assertEqual(g('escape_string_warning'), not_esw)
         self.assertEqual(g('standard_conforming_strings'), not_scs)
-        f('default_with_oids')
+        f('escape_string_warning')
         f('standard_conforming_strings', None)
-        self.assertEqual(g('default_with_oids'), dwi)
+        self.assertEqual(g('escape_string_warning'), esw)
         self.assertEqual(g('standard_conforming_strings'), scs)
-        f('default_with_oids', not_dwi)
+        f('escape_string_warning', not_esw)
         f('standard_conforming_strings', not_scs)
-        self.assertEqual(g('default_with_oids'), not_dwi)
+        self.assertEqual(g('escape_string_warning'), not_esw)
         self.assertEqual(g('standard_conforming_strings'), not_scs)
-        f(['default_with_oids', 'standard_conforming_strings'], None)
-        self.assertEqual(g('default_with_oids'), dwi)
+        f(['escape_string_warning', 'standard_conforming_strings'], None)
+        self.assertEqual(g('escape_string_warning'), esw)
         self.assertEqual(g('standard_conforming_strings'), scs)
-        f('default_with_oids', not_dwi)
+        f('escape_string_warning', not_esw)
         f('standard_conforming_strings', not_scs)
-        self.assertEqual(g('default_with_oids'), not_dwi)
+        self.assertEqual(g('escape_string_warning'), not_esw)
         self.assertEqual(g('standard_conforming_strings'), not_scs)
-        f(('default_with_oids', 'standard_conforming_strings'))
-        self.assertEqual(g('default_with_oids'), dwi)
+        f(('escape_string_warning', 'standard_conforming_strings'))
+        self.assertEqual(g('escape_string_warning'), esw)
         self.assertEqual(g('standard_conforming_strings'), scs)
-        f('default_with_oids', not_dwi)
+        f('escape_string_warning', not_esw)
         f('standard_conforming_strings', not_scs)
-        self.assertEqual(g('default_with_oids'), not_dwi)
+        self.assertEqual(g('escape_string_warning'), not_esw)
         self.assertEqual(g('standard_conforming_strings'), not_scs)
-        f(set(['default_with_oids', 'standard_conforming_strings']))
-        self.assertEqual(g('default_with_oids'), dwi)
+        f(set(['escape_string_warning', 'standard_conforming_strings']))
+        self.assertEqual(g('escape_string_warning'), esw)
         self.assertEqual(g('standard_conforming_strings'), scs)
         db.close()
 
@@ -789,18 +791,18 @@
         self.assertRaises(ValueError, f, 'all', 0)
         self.assertRaises(ValueError, f, 'all', 'off')
         g = db.get_parameter
-        r = g('default_with_oids')
+        r = g('escape_string_warning')
         self.assertIn(r, ('on', 'off'))
         dwi, not_dwi = r, 'off' if r == 'on' else 'on'
         r = g('standard_conforming_strings')
         self.assertIn(r, ('on', 'off'))
         scs, not_scs = r, 'off' if r == 'on' else 'on'
-        f('default_with_oids', not_dwi)
+        f('escape_string_warning', not_dwi)
         f('standard_conforming_strings', not_scs)
-        self.assertEqual(g('default_with_oids'), not_dwi)
+        self.assertEqual(g('escape_string_warning'), not_dwi)
         self.assertEqual(g('standard_conforming_strings'), not_scs)
         f('all')
-        self.assertEqual(g('default_with_oids'), dwi)
+        self.assertEqual(g('escape_string_warning'), dwi)
         self.assertEqual(g('standard_conforming_strings'), scs)
         db.close()
 
@@ -868,6 +870,17 @@
         table = 'test hello world'
         values = [(2, "World!"), (1, "Hello")]
         self.createTable(table, "n smallint, t varchar",
+                         temporary=True, oids=False, values=values)
+        r = self.db.query('select t from "%s" order by n' % table).getresult()
+        r = ', '.join(row[0] for row in r)
+        self.assertEqual(r, "Hello, World!")
+
+    def testCreateTableWithOids(self):
+        if not self.oids:
+            self.skipTest("database does not support tables with oids")
+        table = 'test hello world'
+        values = [(2, "World!"), (1, "Hello")]
+        self.createTable(table, "n smallint, t varchar",
                          temporary=True, oids=True, values=values)
         r = self.db.query('select t from "%s" order by n' % table).getresult()
         r = ', '.join(row[0] for row in r)
@@ -878,6 +891,41 @@
     def testQuery(self):
         query = self.db.query
         table = 'test_table'
+        self.createTable(table, "n integer", oids=False)
+        q = "insert into test_table values (1)"
+        r = query(q)
+        self.assertIsInstance(r, str)
+        self.assertEqual(r, '1')
+        q = "insert into test_table select 2"
+        r = query(q)
+        self.assertIsInstance(r, str)
+        self.assertEqual(r, '1')
+        q = "select n from test_table where n>1"
+        r = query(q).getresult()
+        self.assertEqual(len(r), 1)
+        r = r[0]
+        self.assertEqual(len(r), 1)
+        r = r[0]
+        self.assertIsInstance(r, int)
+        self.assertEqual(r, 2)
+        q = "insert into test_table select 3 union select 4 union select 5"
+        r = query(q)
+        self.assertIsInstance(r, str)
+        self.assertEqual(r, '3')
+        q = "update test_table set n=4 where n<5"
+        r = query(q)
+        self.assertIsInstance(r, str)
+        self.assertEqual(r, '4')
+        q = "delete from test_table"
+        r = query(q)
+        self.assertIsInstance(r, str)
+        self.assertEqual(r, '5')
+
+    def testQueryWithOids(self):
+        if not self.oids:
+            self.skipTest("database does not support tables with oids")
+        query = self.db.query
+        table = 'test_table'
         self.createTable(table, "n integer", oids=True)
         q = "insert into test_table values (1)"
         r = query(q)
@@ -914,14 +962,14 @@
 
     def testQueryWithParams(self):
         query = self.db.query
-        self.createTable('test_table', 'n1 integer, n2 integer', oids=True)
+        self.createTable('test_table', 'n1 integer, n2 integer', oids=False)
         q = "insert into test_table values ($1, $2)"
         r = query(q, (1, 2))
-        self.assertIsInstance(r, int)
+        self.assertEqual(r, '1')
         r = query(q, [3, 4])
-        self.assertIsInstance(r, int)
+        self.assertEqual(r, '1')
         r = query(q, [5, 6])
-        self.assertIsInstance(r, int)
+        self.assertEqual(r, '1')
         q = "select * from test_table order by 1, 2"
         self.assertEqual(query(q).getresult(),
                          [(1, 2), (3, 4), (5, 6)])
@@ -1294,7 +1342,7 @@
                          ' x smallint, y smallint, z smallint,'
                          ' Normal_NaMe smallint, "Special Name" smallint,'
                          ' t text, u char(2), v varchar(2),'
-                         ' primary key (y, u)', oids=True)
+                         ' primary key (y, u)')
         r = get_attnames(table)
         self.assertIsInstance(r, dict)
         if self.regtypes:
@@ -1304,13 +1352,13 @@
                 'm': 'money', 'normal_name': 'smallint',
                 'Special Name': 'smallint', 'u': 'character',
                 't': 'text', 'v': 'character varying', 'y': 'smallint',
-                'x': 'smallint', 'z': 'smallint', 'oid': 'oid'})
+                'x': 'smallint', 'z': 'smallint'})
         else:
             self.assertEqual(r, {'a': 'int', 'b': 'int', 'c': 'int',
                  'e': 'num', 'f': 'float', 'f2': 'float', 'm': 'money',
                  'normal_name': 'int', 'Special Name': 'int',
                  'u': 'text', 't': 'text', 'v': 'text',
-                 'y': 'int', 'x': 'int', 'z': 'int', 'oid': 'int'})
+                 'y': 'int', 'x': 'int', 'z': 'int'})
 
     def testGetAttnamesWithRegtypes(self):
         get_attnames = self.db.get_attnames
@@ -1520,7 +1568,9 @@
         s.pop('n')
         self.assertRaises(KeyError, get, table, s)
 
-    def testGetWithOid(self):
+    def testGetWithOids(self):
+        if not self.oids:
+            self.skipTest("database does not support tables with oids")
         get = self.db.get
         query = self.db.query
         table = 'get_with_oid_test_table'
@@ -1680,8 +1730,7 @@
             'i2 smallint, i4 integer, i8 bigint,'
             ' d numeric, f4 real, f8 double precision, m money,'
             ' v4 varchar(4), c4 char(4), t text,'
-            ' b boolean, ts timestamp', oids=True)
-        oid_table = 'oid(%s)' % table
+            ' b boolean, ts timestamp')
         tests = [dict(i2=None, i4=None, i8=None),
              (dict(i2='', i4='', i8=''), dict(i2=None, i4=None, i8=None)),
              (dict(i2=0, i4=0, i8=0), dict(i2=0, i4=0, i8=0)),
@@ -1698,7 +1747,7 @@
              dict(d=Decimal('123456789.9876543212345678987654321')),
              dict(m=None), (dict(m=''), dict(m=None)),
              dict(m=Decimal('-1234.56')),
-             (dict(m=('-1234.56')), dict(m=Decimal('-1234.56'))),
+             (dict(m='-1234.56'), dict(m=Decimal('-1234.56'))),
              dict(m=Decimal('1234.56')), dict(m=Decimal('123456')),
              (dict(m='1234.56'), dict(m=Decimal('1234.56'))),
              (dict(m=1234.5), dict(m=Decimal('1234.5'))),
@@ -1745,9 +1794,6 @@
                 if m is not None:
                     expect['m'] = decimal(m)
             self.assertEqual(insert(table, data), data)
-            self.assertIn(oid_table, data)
-            oid = data[oid_table]
-            self.assertIsInstance(oid, int)
             data = "" for item in data.items()
                         if item[0] in expect)
             ts = expect.get('ts')
@@ -1761,15 +1807,14 @@
                     ts = datetime.strptime(ts, '%Y-%m-%d %H:%M:%S')
                 expect['ts'] = ts
             self.assertEqual(data, expect)
-            data = ""
-                'select oid,* from "%s"' % table).dictresult()[0]
-            self.assertEqual(data['oid'], oid)
-            data = "" for item in data.items()
-                        if item[0] in expect)
+            data = "" * from "%s"' % table).dictresult()[0]
+            data = "" for item in data.items() if item[0] in expect)
             self.assertEqual(data, expect)
             query('delete from "%s"' % table)
 
-    def testInsertWithOid(self):
+    def testInsertWithOids(self):
+        if not self.oids:
+            self.skipTest("database does not support tables with oids")
         insert = self.db.insert
         query = self.db.query
         self.createTable('test_table', 'n int', oids=True)
@@ -1894,10 +1939,10 @@
         self.assertRaises(pg.ProgrammingError, update,
                           'test', i2=2, i4=4, i8=8)
         table = 'update_test_table'
-        self.createTable(table, 'n integer, t text', oids=True,
+        self.createTable(table, 'n integer primary key, t text',
                          values=enumerate('xyz', start=1))
-        self.assertRaises(pg.ProgrammingError, self.db.get, table, 2)
-        r = self.db.get(table, 2, 'n')
+        self.assertRaises(pg.DatabaseError, self.db.get, table, 4)
+        r = self.db.get(table, 2)
         r['t'] = 'u'
         s = update(table, r)
         self.assertEqual(s, r)
@@ -1905,7 +1950,9 @@
         r = query(q).getresult()[0][0]
         self.assertEqual(r, 'u')
 
-    def testUpdateWithOid(self):
+    def testUpdateWithOids(self):
+        if not self.oids:
+            self.skipTest("database does not support tables with oids")
         update = self.db.update
         get = self.db.get
         query = self.db.query
@@ -2063,7 +2110,7 @@
         self.assertRaises(pg.ProgrammingError, upsert,
                           'test', i2=2, i4=4, i8=8)
         table = 'upsert_test_table'
-        self.createTable(table, 'n integer primary key, t text', oids=True)
+        self.createTable(table, 'n integer primary key, t text')
         s = dict(n=1, t='x')
         try:
             r = upsert(table, s)
@@ -2129,7 +2176,9 @@
         r = upsert(table, s, oid='invalid')
         self.assertIs(r, s)
 
-    def testUpsertWithOid(self):
+    def testUpsertWithOids(self):
+        if not self.oids:
+            self.skipTest("database does not support tables with oids")
         upsert = self.db.upsert
         get = self.db.get
         query = self.db.query
@@ -2217,8 +2266,8 @@
         upsert = self.db.upsert
         query = self.db.query
         table = 'upsert_test_table_2'
-        self.createTable(table,
-                         'n integer, m integer, t text, primary key (n, m)')
+        self.createTable(
+            table, 'n integer, m integer, t text, primary key (n, m)')
         s = dict(n=1, m=2, t='x')
         try:
             r = upsert(table, s)
@@ -2318,7 +2367,7 @@
         self.assertEqual(r, result)
         table = 'clear_test_table'
         self.createTable(table,
-            'n integer, f float, b boolean, d date, t text', oids=True)
+            'n integer, f float, b boolean, d date, t text')
         r = clear(table)
         result = dict(n=0, f=0, b=f, d='', t='')
         self.assertEqual(r, result)
@@ -2347,13 +2396,13 @@
         self.assertRaises(pg.ProgrammingError, delete,
                           'test', dict(i2=2, i4=4, i8=8))
         table = 'delete_test_table'
-        self.createTable(table, 'n integer, t text', oids=True,
-                         values=enumerate('xyz', start=1))
-        self.assertRaises(pg.ProgrammingError, self.db.get, table, 2)
-        r = self.db.get(table, 1, 'n')
+        self.createTable(table, 'n integer primary key, t text',
+                         oids=False, values=enumerate('xyz', start=1))
+        self.assertRaises(pg.DatabaseError, self.db.get, table, 4)
+        r = self.db.get(table, 1)
         s = delete(table, r)
         self.assertEqual(s, 1)
-        r = self.db.get(table, 3, 'n')
+        r = self.db.get(table, 3)
         s = delete(table, r)
         self.assertEqual(s, 1)
         s = delete(table, r)
@@ -2363,18 +2412,20 @@
         r = r[0]
         result = {'n': 2, 't': 'y'}
         self.assertEqual(r, result)
-        r = self.db.get(table, 2, 'n')
+        r = self.db.get(table, 2)
         s = delete(table, r)
         self.assertEqual(s, 1)
         s = delete(table, r)
         self.assertEqual(s, 0)
-        self.assertRaises(pg.DatabaseError, self.db.get, table, 2, 'n')
+        self.assertRaises(pg.DatabaseError, self.db.get, table, 2)
         # not existing columns and oid parameter should be ignored
         r.update(m=3, u='z', oid='invalid')
         s = delete(table, r)
         self.assertEqual(s, 0)
 
-    def testDeleteWithOid(self):
+    def testDeleteWithOids(self):
+        if not self.oids:
+            self.skipTest("database does not support tables with oids")
         delete = self.db.delete
         get = self.db.get
         query = self.db.query
@@ -3408,7 +3459,7 @@
     def testArrayLiteral(self):
         insert = self.db.insert
         returns_arrays = pg.get_array()
-        self.createTable('arraytest', 'i int[], t text[]', oids=True)
+        self.createTable('arraytest', 'i int[], t text[]')
         r = dict(i=[1, 2, 3], t=['a', 'b', 'c'])
         insert('arraytest', r)
         if returns_arrays:
@@ -3439,26 +3490,23 @@
 
     def testArrayOfIds(self):
         array_on = pg.get_array()
-        self.createTable('arraytest', 'c cid[], o oid[], x xid[]', oids=True)
+        self.createTable(
+            'arraytest', 'i serial primary key, c cid[], o oid[], x xid[]')
         r = self.db.get_attnames('arraytest')
         if self.regtypes:
             self.assertEqual(r, dict(
-                oid='oid', c='cid[]', o='oid[]', x='xid[]'))
+               i='integer', c='cid[]', o='oid[]', x='xid[]'))
         else:
             self.assertEqual(r, dict(
-                oid='int', c='int[]', o='int[]', x='int[]'))
-        data = "" 12, 13], o=[21, 22, 23], x=[31, 32, 33])
+                i='int', c='int[]', o='int[]', x='int[]'))
+        data = "" c=[11, 12, 13], o=[21, 22, 23], x=[31, 32, 33])
         r = data.copy()
         self.db.insert('arraytest', r)
-        qoid = 'oid(arraytest)'
-        oid = r.pop(qoid)
         if array_on:
             self.assertEqual(r, data)
         else:
             self.assertEqual(r['o'], '{21,22,23}')
-        r = {qoid: oid}
         self.db.get('arraytest', r)
-        self.assertEqual(oid, r.pop(qoid))
         if array_on:
             self.assertEqual(r, data)
         else:
@@ -3466,7 +3514,7 @@
 
     def testArrayOfText(self):
         array_on = pg.get_array()
-        self.createTable('arraytest', 'data text[]', oids=True)
+        self.createTable('arraytest', 'id serial primary key, data text[]')
         r = self.db.get_attnames('arraytest')
         self.assertEqual(r['data'], 'text[]')
         data = "" World!', '', None, '{a,b,c}', '"Hi!"',
@@ -3490,7 +3538,7 @@
     def testArrayOfBytea(self):
         array_on = pg.get_array()
         bytea_escaped = pg.get_bytea_escaped()
-        self.createTable('arraytest', 'data bytea[]', oids=True)
+        self.createTable('arraytest', 'id serial primary key, data bytea[]')
         r = self.db.get_attnames('arraytest')
         self.assertEqual(r['data'], 'bytea[]')
         data = "" World!', b'', None, b'{a,b,c}', b'"Hi!"',
@@ -3518,7 +3566,8 @@
 
     def testArrayOfJson(self):
         try:
-            self.createTable('arraytest', 'data json[]', oids=True)
+            self.createTable(
+                'arraytest', 'id serial primary key, data json[]')
         except pg.ProgrammingError as error:
             if self.db.server_version < 90200:
                 self.skipTest('database does not support json')
@@ -3565,7 +3614,8 @@
 
     def testArrayOfJsonb(self):
         try:
-            self.createTable('arraytest', 'data jsonb[]', oids=True)
+            self.createTable(
+                'arraytest', 'id serial primary key, data jsonb[]')
         except pg.ProgrammingError as error:
             if self.db.server_version < 90400:
                 self.skipTest('database does not support jsonb')
@@ -3612,7 +3662,8 @@
 
     def testDeepArray(self):
         array_on = pg.get_array()
-        self.createTable('arraytest', 'data text[][][]', oids=True)
+        self.createTable(
+            'arraytest', 'id serial primary key, data text[][][]')
         r = self.db.get_attnames('arraytest')
         self.assertEqual(r['data'], 'text[]')
         data = "" World!', '{a,b,c}', 'back\\slash']]]
@@ -3635,11 +3686,12 @@
               ' (name varchar, age smallint, married bool,'
               ' weight real, salary money)')
         self.addCleanup(query, 'drop type test_person_type')
-        self.createTable('test_person', 'person test_person_type',
-                         temporary=False, oids=True)
+        self.createTable('test_person',
+                         'id serial primary key, person test_person_type',
+                         oids=False, temporary=False)
         attnames = self.db.get_attnames('test_person')
         self.assertEqual(len(attnames), 2)
-        self.assertIn('oid', attnames)
+        self.assertIn('id', attnames)
         self.assertIn('person', attnames)
         person_typ = attnames['person']
         if self.regtypes:
@@ -3663,6 +3715,7 @@
             t, f = 't', 'f'
         person = ('John Doe', 61, t, 99.5, decimal('93456.75'))
         r = self.db.insert('test_person', None, person=person)
+        self.assertEqual(r['id'], 1)
         p = r['person']
         self.assertIsInstance(p, tuple)
         self.assertEqual(p, person)
@@ -3675,6 +3728,7 @@
         person = ('Jane Roe', 59, f, 64.5, decimal('96543.25'))
         r['person'] = person
         self.db.update('test_person', r)
+        self.assertEqual(r['id'], 1)
         p = r['person']
         self.assertIsInstance(p, tuple)
         self.assertEqual(p, person)
@@ -3686,6 +3740,7 @@
         self.assertIsInstance(p.salary, decimal)
         r['person'] = None
         self.db.get('test_person', r)
+        self.assertEqual(r['id'], 1)
         p = r['person']
         self.assertIsInstance(p, tuple)
         self.assertEqual(p, person)
@@ -3697,6 +3752,7 @@
         self.assertIsInstance(p.salary, decimal)
         person = (None,) * 5
         r = self.db.insert('test_person', None, person=person)
+        self.assertEqual(r['id'], 2)
         p = r['person']
         self.assertIsInstance(p, tuple)
         self.assertIsNone(p.name)
@@ -3706,6 +3762,7 @@
         self.assertIsNone(p.salary)
         r['person'] = None
         self.db.get('test_person', r)
+        self.assertEqual(r['id'], 2)
         p = r['person']
         self.assertIsInstance(p, tuple)
         self.assertIsNone(p.name)
@@ -3714,9 +3771,11 @@
         self.assertIsNone(p.weight)
         self.assertIsNone(p.salary)
         r = self.db.insert('test_person', None, person=None)
+        self.assertEqual(r['id'], 3)
         self.assertIsNone(r['person'])
         r['person'] = None
         self.db.get('test_person', r)
+        self.assertEqual(r['id'], 3)
         self.assertIsNone(r['person'])
 
     def testRecordInsertBytea(self):
@@ -3725,7 +3784,7 @@
               ' (name text, picture bytea)')
         self.addCleanup(query, 'drop type test_person_type')
         self.createTable('test_person', 'person test_person_type',
-                         temporary=False, oids=True)
+                         temporary=False)
         person_typ = self.db.get_attnames('test_person')['person']
         self.assertEqual(person_typ.attnames,
                          dict(name='text', picture='bytea'))
@@ -3750,7 +3809,7 @@
             self.fail(str(error))
         self.addCleanup(query, 'drop type test_person_type')
         self.createTable('test_person', 'person test_person_type',
-                         temporary=False, oids=True)
+                         temporary=False)
         person_typ = self.db.get_attnames('test_person')['person']
         self.assertEqual(person_typ.attnames,
                          dict(name='text', data=''))
@@ -3772,7 +3831,7 @@
               ' (name varchar, age smallint)')
         self.addCleanup(query, 'drop type test_person_type')
         self.createTable('test_person', 'person test_person_type',
-                         temporary=False, oids=True)
+                         temporary=False)
         person_typ = self.db.get_attnames('test_person')['person']
         if self.regtypes:
             self.assertEqual(person_typ, 'test_person_type')
@@ -4010,12 +4069,12 @@
         attnames = typ.attnames
         self.assertIsInstance(attnames, dict)
         self.assertIs(attnames, dbtypes.get_attnames('pg_type'))
-        self.assertEqual(list(attnames)[0], 'typname')
+        self.assertIn('typname', attnames)
         typname = attnames['typname']
         self.assertEqual(typname, 'name' if self.regtypes else 'text')
         self.assertEqual(typname.typtype, 'b')  # base
         self.assertEqual(typname.category, 'S')  # string
-        self.assertEqual(list(attnames)[3], 'typlen')
+        self.assertIn('typlen', attnames)
         typlen = attnames['typlen']
         self.assertEqual(typlen, 'smallint' if self.regtypes else 'int')
         self.assertEqual(typlen.typtype, 'b')  # base
@@ -4267,14 +4326,12 @@
         self.assertRaises(TypeError, format_query,
             '%s,%s', dict(i1=1, i2=2), dict(i1='int2'))
         values = dict(i=3, f=7.5, t='hello', b=True)
-        types = dict(i='int4', f='float4',
-            t='text', b='bool')
+        types = dict(i='int4', f='float4', t='text', b='bool')
         sql, params = format_query(
             "select %(i)s,%(f)s,%(t)s,%(b)s", values, types)
         self.assertEqual(sql, 'select $3,$2,$4,$1')
         self.assertEqual(params, ['t', 7.5, 3, 'hello'])
-        types = dict(i='bool', f='bool',
-            t='bool', b='bool')
+        types = dict(i='bool', f='bool', t='bool', b='bool')
         sql, params = format_query(
             "select %(i)s,%(f)s,%(t)s,%(b)s", values, types)
         self.assertEqual(sql, 'select $3,$2,$4,$1')
@@ -4437,6 +4494,7 @@
     @classmethod
     def setUpClass(cls):
         db = DB()
+        cls.with_oids = "with oids" if db.server_version < 120000 else ""
         query = db.query
         for num_schema in range(5):
             if num_schema:
@@ -4452,10 +4510,10 @@
                 schema = "public"
                 query("drop table if exists %s.t" % (schema,))
                 query("drop table if exists %s.t%d" % (schema, num_schema))
-            query("create table %s.t with oids as select 1 as n, %d as d"
-                  % (schema, num_schema))
-            query("create table %s.t%d with oids as select 1 as n, %d as d"
-                  % (schema, num_schema, num_schema))
+            query("create table %s.t %s as select 1 as n, %d as d"
+                  % (schema, cls.with_oids, num_schema))
+            query("create table %s.t%d %s as select 1 as n, %d as d"
+                  % (schema, num_schema, cls.with_oids, num_schema))
         db.close()
         cls.cls_set_up = True
 
@@ -4495,7 +4553,9 @@
     def testGetAttnames(self):
         get_attnames = self.db.get_attnames
         query = self.db.query
-        result = {'oid': 'int', 'd': 'int', 'n': 'int'}
+        result = {'d': 'int', 'n': 'int'}
+        if self.with_oids:
+            result['oid'] = 'int'
         r = get_attnames("t")
         self.assertEqual(r, result)
         r = get_attnames("s4.t4")
@@ -4502,8 +4562,10 @@
         self.assertEqual(r, result)
         query("drop table if exists s3.t3m")
         self.addCleanup(query, "drop table s3.t3m")
-        query("create table s3.t3m with oids as select 1 as m")
-        result_m = {'oid': 'int', 'm': 'int'}
+        query("create table s3.t3m %s as select 1 as m" % (self.with_oids,))
+        result_m = {'m': 'int'}
+        if self.with_oids:
+            result_m['oid'] = 'int'
         r = get_attnames("s3.t3m")
         self.assertEqual(r, result_m)
         query("set search_path to s1,s3")
@@ -4540,13 +4602,22 @@
         get = self.db.get
         query = self.db.query
         r = get("t", 1, 'n')
-        self.assertIn('oid(t)', r)
+        if self.with_oids:
+            self.assertIn('oid(t)', r)
+        else:
+            self.assertNotIn('oid(t)', r)
         query("set search_path to s2")
         r = get("t2", 1, 'n')
-        self.assertIn('oid(t2)', r)
+        if self.with_oids:
+            self.assertIn('oid(t2)', r)
+        else:
+            self.assertNotIn('oid(t2)', r)
         query("set search_path to s3")
         r = get("t", 1, 'n')
-        self.assertIn('oid(t)', r)
+        if self.with_oids:
+            self.assertIn('oid(t)', r)
+        else:
+            self.assertNotIn('oid(t)', r)
 
 
 class TestDebug(unittest.TestCase):

Modified: trunk/tests/test_dbapi20.py (1024 => 1025)


--- trunk/tests/test_dbapi20.py	2019-10-03 19:43:29 UTC (rev 1024)
+++ trunk/tests/test_dbapi20.py	2019-10-03 23:35:34 UTC (rev 1025)
@@ -357,6 +357,8 @@
             self.assertEqual(type_info.type, 'c')  # composite
             self.assertEqual(type_info.category, 'C')  # composite
             cols = type_cache.get_fields('pg_type')
+            if cols[0].name == 'oid':  # PostgreSQL < 12
+                del cols[0]
             self.assertEqual(cols[0].name, 'typname')
             typname = type_cache[cols[0].type]
             self.assertEqual(typname, 'name')
_______________________________________________
PyGreSQL mailing list
[email protected]
https://mail.vex.net/mailman/listinfo/pygresql

Reply via email to