Modified: trunk/tests/test_classic_dbwrapper.py (1024 => 1025)
--- trunk/tests/test_classic_dbwrapper.py 2019-10-03 19:43:29 UTC (rev 1024)
+++ trunk/tests/test_classic_dbwrapper.py 2019-10-03 23:35:34 UTC (rev 1025)
@@ -448,6 +448,7 @@
@classmethod
def setUpClass(cls):
db = DB()
+ cls.oids = db.server_version < 120000
db.query("drop table if exists test cascade")
db.query("create table test ("
"i2 smallint, i4 integer, i8 bigint,"
@@ -499,7 +500,8 @@
as_query = definition.startswith(('as ', 'AS '))
if not as_query and not definition.startswith('('):
definition = '(%s)' % definition
- with_oids = 'with oids' if oids else 'without oids'
+ with_oids = 'with oids' if oids else (
+ 'without oids' if self.oids else '')
q = ['create', temporary, table]
if as_query:
q.extend([with_oids, definition])
@@ -720,23 +722,23 @@
f(['standard_conforming_strings', 'datestyle'], ['on', 'ISO, DMY'])
self.assertEqual(g('standard_conforming_strings'), 'on')
self.assertEqual(g('datestyle'), 'ISO, DMY')
- f(['default_with_oids', 'standard_conforming_strings'], 'off')
- self.assertEqual(g('default_with_oids'), 'off')
+ f(['escape_string_warning', 'standard_conforming_strings'], 'off')
+ self.assertEqual(g('escape_string_warning'), 'off')
self.assertEqual(g('standard_conforming_strings'), 'off')
f(('standard_conforming_strings', 'datestyle'), ('on', 'ISO, YMD'))
self.assertEqual(g('standard_conforming_strings'), 'on')
self.assertEqual(g('datestyle'), 'ISO, YMD')
- f(('default_with_oids', 'standard_conforming_strings'), 'off')
- self.assertEqual(g('default_with_oids'), 'off')
+ f(('escape_string_warning', 'standard_conforming_strings'), 'off')
+ self.assertEqual(g('escape_string_warning'), 'off')
self.assertEqual(g('standard_conforming_strings'), 'off')
- f(set(['default_with_oids', 'standard_conforming_strings']), 'on')
- self.assertEqual(g('default_with_oids'), 'on')
+ f(set(['escape_string_warning', 'standard_conforming_strings']), 'on')
+ self.assertEqual(g('escape_string_warning'), 'on')
self.assertEqual(g('standard_conforming_strings'), 'on')
- self.assertRaises(ValueError, f, set(['default_with_oids',
+ self.assertRaises(ValueError, f, set(['escape_string_warning',
'standard_conforming_strings']), ['off', 'on'])
- f(set(['default_with_oids', 'standard_conforming_strings']),
+ f(set(['escape_string_warning', 'standard_conforming_strings']),
['off', 'off'])
- self.assertEqual(g('default_with_oids'), 'off')
+ self.assertEqual(g('escape_string_warning'), 'off')
self.assertEqual(g('standard_conforming_strings'), 'off')
f({'standard_conforming_strings': 'on', 'datestyle': 'ISO, YMD'})
self.assertEqual(g('standard_conforming_strings'), 'on')
@@ -746,40 +748,40 @@
db = DB()
f = db.set_parameter
g = db.get_parameter
- r = g('default_with_oids')
+ r = g('escape_string_warning')
self.assertIn(r, ('on', 'off'))
- dwi, not_dwi = r, 'off' if r == 'on' else 'on'
+ esw, not_esw = r, 'off' if r == 'on' else 'on'
r = g('standard_conforming_strings')
self.assertIn(r, ('on', 'off'))
scs, not_scs = r, 'off' if r == 'on' else 'on'
- f('default_with_oids', not_dwi)
+ f('escape_string_warning', not_esw)
f('standard_conforming_strings', not_scs)
- self.assertEqual(g('default_with_oids'), not_dwi)
+ self.assertEqual(g('escape_string_warning'), not_esw)
self.assertEqual(g('standard_conforming_strings'), not_scs)
- f('default_with_oids')
+ f('escape_string_warning')
f('standard_conforming_strings', None)
- self.assertEqual(g('default_with_oids'), dwi)
+ self.assertEqual(g('escape_string_warning'), esw)
self.assertEqual(g('standard_conforming_strings'), scs)
- f('default_with_oids', not_dwi)
+ f('escape_string_warning', not_esw)
f('standard_conforming_strings', not_scs)
- self.assertEqual(g('default_with_oids'), not_dwi)
+ self.assertEqual(g('escape_string_warning'), not_esw)
self.assertEqual(g('standard_conforming_strings'), not_scs)
- f(['default_with_oids', 'standard_conforming_strings'], None)
- self.assertEqual(g('default_with_oids'), dwi)
+ f(['escape_string_warning', 'standard_conforming_strings'], None)
+ self.assertEqual(g('escape_string_warning'), esw)
self.assertEqual(g('standard_conforming_strings'), scs)
- f('default_with_oids', not_dwi)
+ f('escape_string_warning', not_esw)
f('standard_conforming_strings', not_scs)
- self.assertEqual(g('default_with_oids'), not_dwi)
+ self.assertEqual(g('escape_string_warning'), not_esw)
self.assertEqual(g('standard_conforming_strings'), not_scs)
- f(('default_with_oids', 'standard_conforming_strings'))
- self.assertEqual(g('default_with_oids'), dwi)
+ f(('escape_string_warning', 'standard_conforming_strings'))
+ self.assertEqual(g('escape_string_warning'), esw)
self.assertEqual(g('standard_conforming_strings'), scs)
- f('default_with_oids', not_dwi)
+ f('escape_string_warning', not_esw)
f('standard_conforming_strings', not_scs)
- self.assertEqual(g('default_with_oids'), not_dwi)
+ self.assertEqual(g('escape_string_warning'), not_esw)
self.assertEqual(g('standard_conforming_strings'), not_scs)
- f(set(['default_with_oids', 'standard_conforming_strings']))
- self.assertEqual(g('default_with_oids'), dwi)
+ f(set(['escape_string_warning', 'standard_conforming_strings']))
+ self.assertEqual(g('escape_string_warning'), esw)
self.assertEqual(g('standard_conforming_strings'), scs)
db.close()
@@ -789,18 +791,18 @@
self.assertRaises(ValueError, f, 'all', 0)
self.assertRaises(ValueError, f, 'all', 'off')
g = db.get_parameter
- r = g('default_with_oids')
+ r = g('escape_string_warning')
self.assertIn(r, ('on', 'off'))
dwi, not_dwi = r, 'off' if r == 'on' else 'on'
r = g('standard_conforming_strings')
self.assertIn(r, ('on', 'off'))
scs, not_scs = r, 'off' if r == 'on' else 'on'
- f('default_with_oids', not_dwi)
+ f('escape_string_warning', not_dwi)
f('standard_conforming_strings', not_scs)
- self.assertEqual(g('default_with_oids'), not_dwi)
+ self.assertEqual(g('escape_string_warning'), not_dwi)
self.assertEqual(g('standard_conforming_strings'), not_scs)
f('all')
- self.assertEqual(g('default_with_oids'), dwi)
+ self.assertEqual(g('escape_string_warning'), dwi)
self.assertEqual(g('standard_conforming_strings'), scs)
db.close()
@@ -868,6 +870,17 @@
table = 'test hello world'
values = [(2, "World!"), (1, "Hello")]
self.createTable(table, "n smallint, t varchar",
+ temporary=True, oids=False, values=values)
+ r = self.db.query('select t from "%s" order by n' % table).getresult()
+ r = ', '.join(row[0] for row in r)
+ self.assertEqual(r, "Hello, World!")
+
+ def testCreateTableWithOids(self):
+ if not self.oids:
+ self.skipTest("database does not support tables with oids")
+ table = 'test hello world'
+ values = [(2, "World!"), (1, "Hello")]
+ self.createTable(table, "n smallint, t varchar",
temporary=True, oids=True, values=values)
r = self.db.query('select t from "%s" order by n' % table).getresult()
r = ', '.join(row[0] for row in r)
@@ -878,6 +891,41 @@
def testQuery(self):
query = self.db.query
table = 'test_table'
+ self.createTable(table, "n integer", oids=False)
+ q = "insert into test_table values (1)"
+ r = query(q)
+ self.assertIsInstance(r, str)
+ self.assertEqual(r, '1')
+ q = "insert into test_table select 2"
+ r = query(q)
+ self.assertIsInstance(r, str)
+ self.assertEqual(r, '1')
+ q = "select n from test_table where n>1"
+ r = query(q).getresult()
+ self.assertEqual(len(r), 1)
+ r = r[0]
+ self.assertEqual(len(r), 1)
+ r = r[0]
+ self.assertIsInstance(r, int)
+ self.assertEqual(r, 2)
+ q = "insert into test_table select 3 union select 4 union select 5"
+ r = query(q)
+ self.assertIsInstance(r, str)
+ self.assertEqual(r, '3')
+ q = "update test_table set n=4 where n<5"
+ r = query(q)
+ self.assertIsInstance(r, str)
+ self.assertEqual(r, '4')
+ q = "delete from test_table"
+ r = query(q)
+ self.assertIsInstance(r, str)
+ self.assertEqual(r, '5')
+
+ def testQueryWithOids(self):
+ if not self.oids:
+ self.skipTest("database does not support tables with oids")
+ query = self.db.query
+ table = 'test_table'
self.createTable(table, "n integer", oids=True)
q = "insert into test_table values (1)"
r = query(q)
@@ -914,14 +962,14 @@
def testQueryWithParams(self):
query = self.db.query
- self.createTable('test_table', 'n1 integer, n2 integer', oids=True)
+ self.createTable('test_table', 'n1 integer, n2 integer', oids=False)
q = "insert into test_table values ($1, $2)"
r = query(q, (1, 2))
- self.assertIsInstance(r, int)
+ self.assertEqual(r, '1')
r = query(q, [3, 4])
- self.assertIsInstance(r, int)
+ self.assertEqual(r, '1')
r = query(q, [5, 6])
- self.assertIsInstance(r, int)
+ self.assertEqual(r, '1')
q = "select * from test_table order by 1, 2"
self.assertEqual(query(q).getresult(),
[(1, 2), (3, 4), (5, 6)])
@@ -1294,7 +1342,7 @@
' x smallint, y smallint, z smallint,'
' Normal_NaMe smallint, "Special Name" smallint,'
' t text, u char(2), v varchar(2),'
- ' primary key (y, u)', oids=True)
+ ' primary key (y, u)')
r = get_attnames(table)
self.assertIsInstance(r, dict)
if self.regtypes:
@@ -1304,13 +1352,13 @@
'm': 'money', 'normal_name': 'smallint',
'Special Name': 'smallint', 'u': 'character',
't': 'text', 'v': 'character varying', 'y': 'smallint',
- 'x': 'smallint', 'z': 'smallint', 'oid': 'oid'})
+ 'x': 'smallint', 'z': 'smallint'})
else:
self.assertEqual(r, {'a': 'int', 'b': 'int', 'c': 'int',
'e': 'num', 'f': 'float', 'f2': 'float', 'm': 'money',
'normal_name': 'int', 'Special Name': 'int',
'u': 'text', 't': 'text', 'v': 'text',
- 'y': 'int', 'x': 'int', 'z': 'int', 'oid': 'int'})
+ 'y': 'int', 'x': 'int', 'z': 'int'})
def testGetAttnamesWithRegtypes(self):
get_attnames = self.db.get_attnames
@@ -1520,7 +1568,9 @@
s.pop('n')
self.assertRaises(KeyError, get, table, s)
- def testGetWithOid(self):
+ def testGetWithOids(self):
+ if not self.oids:
+ self.skipTest("database does not support tables with oids")
get = self.db.get
query = self.db.query
table = 'get_with_oid_test_table'
@@ -1680,8 +1730,7 @@
'i2 smallint, i4 integer, i8 bigint,'
' d numeric, f4 real, f8 double precision, m money,'
' v4 varchar(4), c4 char(4), t text,'
- ' b boolean, ts timestamp', oids=True)
- oid_table = 'oid(%s)' % table
+ ' b boolean, ts timestamp')
tests = [dict(i2=None, i4=None, i8=None),
(dict(i2='', i4='', i8=''), dict(i2=None, i4=None, i8=None)),
(dict(i2=0, i4=0, i8=0), dict(i2=0, i4=0, i8=0)),
@@ -1698,7 +1747,7 @@
dict(d=Decimal('123456789.9876543212345678987654321')),
dict(m=None), (dict(m=''), dict(m=None)),
dict(m=Decimal('-1234.56')),
- (dict(m=('-1234.56')), dict(m=Decimal('-1234.56'))),
+ (dict(m='-1234.56'), dict(m=Decimal('-1234.56'))),
dict(m=Decimal('1234.56')), dict(m=Decimal('123456')),
(dict(m='1234.56'), dict(m=Decimal('1234.56'))),
(dict(m=1234.5), dict(m=Decimal('1234.5'))),
@@ -1745,9 +1794,6 @@
if m is not None:
expect['m'] = decimal(m)
self.assertEqual(insert(table, data), data)
- self.assertIn(oid_table, data)
- oid = data[oid_table]
- self.assertIsInstance(oid, int)
data = "" for item in data.items()
if item[0] in expect)
ts = expect.get('ts')
@@ -1761,15 +1807,14 @@
ts = datetime.strptime(ts, '%Y-%m-%d %H:%M:%S')
expect['ts'] = ts
self.assertEqual(data, expect)
- data = ""
- 'select oid,* from "%s"' % table).dictresult()[0]
- self.assertEqual(data['oid'], oid)
- data = "" for item in data.items()
- if item[0] in expect)
+ data = "" * from "%s"' % table).dictresult()[0]
+ data = "" for item in data.items() if item[0] in expect)
self.assertEqual(data, expect)
query('delete from "%s"' % table)
- def testInsertWithOid(self):
+ def testInsertWithOids(self):
+ if not self.oids:
+ self.skipTest("database does not support tables with oids")
insert = self.db.insert
query = self.db.query
self.createTable('test_table', 'n int', oids=True)
@@ -1894,10 +1939,10 @@
self.assertRaises(pg.ProgrammingError, update,
'test', i2=2, i4=4, i8=8)
table = 'update_test_table'
- self.createTable(table, 'n integer, t text', oids=True,
+ self.createTable(table, 'n integer primary key, t text',
values=enumerate('xyz', start=1))
- self.assertRaises(pg.ProgrammingError, self.db.get, table, 2)
- r = self.db.get(table, 2, 'n')
+ self.assertRaises(pg.DatabaseError, self.db.get, table, 4)
+ r = self.db.get(table, 2)
r['t'] = 'u'
s = update(table, r)
self.assertEqual(s, r)
@@ -1905,7 +1950,9 @@
r = query(q).getresult()[0][0]
self.assertEqual(r, 'u')
- def testUpdateWithOid(self):
+ def testUpdateWithOids(self):
+ if not self.oids:
+ self.skipTest("database does not support tables with oids")
update = self.db.update
get = self.db.get
query = self.db.query
@@ -2063,7 +2110,7 @@
self.assertRaises(pg.ProgrammingError, upsert,
'test', i2=2, i4=4, i8=8)
table = 'upsert_test_table'
- self.createTable(table, 'n integer primary key, t text', oids=True)
+ self.createTable(table, 'n integer primary key, t text')
s = dict(n=1, t='x')
try:
r = upsert(table, s)
@@ -2129,7 +2176,9 @@
r = upsert(table, s, oid='invalid')
self.assertIs(r, s)
- def testUpsertWithOid(self):
+ def testUpsertWithOids(self):
+ if not self.oids:
+ self.skipTest("database does not support tables with oids")
upsert = self.db.upsert
get = self.db.get
query = self.db.query
@@ -2217,8 +2266,8 @@
upsert = self.db.upsert
query = self.db.query
table = 'upsert_test_table_2'
- self.createTable(table,
- 'n integer, m integer, t text, primary key (n, m)')
+ self.createTable(
+ table, 'n integer, m integer, t text, primary key (n, m)')
s = dict(n=1, m=2, t='x')
try:
r = upsert(table, s)
@@ -2318,7 +2367,7 @@
self.assertEqual(r, result)
table = 'clear_test_table'
self.createTable(table,
- 'n integer, f float, b boolean, d date, t text', oids=True)
+ 'n integer, f float, b boolean, d date, t text')
r = clear(table)
result = dict(n=0, f=0, b=f, d='', t='')
self.assertEqual(r, result)
@@ -2347,13 +2396,13 @@
self.assertRaises(pg.ProgrammingError, delete,
'test', dict(i2=2, i4=4, i8=8))
table = 'delete_test_table'
- self.createTable(table, 'n integer, t text', oids=True,
- values=enumerate('xyz', start=1))
- self.assertRaises(pg.ProgrammingError, self.db.get, table, 2)
- r = self.db.get(table, 1, 'n')
+ self.createTable(table, 'n integer primary key, t text',
+ oids=False, values=enumerate('xyz', start=1))
+ self.assertRaises(pg.DatabaseError, self.db.get, table, 4)
+ r = self.db.get(table, 1)
s = delete(table, r)
self.assertEqual(s, 1)
- r = self.db.get(table, 3, 'n')
+ r = self.db.get(table, 3)
s = delete(table, r)
self.assertEqual(s, 1)
s = delete(table, r)
@@ -2363,18 +2412,20 @@
r = r[0]
result = {'n': 2, 't': 'y'}
self.assertEqual(r, result)
- r = self.db.get(table, 2, 'n')
+ r = self.db.get(table, 2)
s = delete(table, r)
self.assertEqual(s, 1)
s = delete(table, r)
self.assertEqual(s, 0)
- self.assertRaises(pg.DatabaseError, self.db.get, table, 2, 'n')
+ self.assertRaises(pg.DatabaseError, self.db.get, table, 2)
# not existing columns and oid parameter should be ignored
r.update(m=3, u='z', oid='invalid')
s = delete(table, r)
self.assertEqual(s, 0)
- def testDeleteWithOid(self):
+ def testDeleteWithOids(self):
+ if not self.oids:
+ self.skipTest("database does not support tables with oids")
delete = self.db.delete
get = self.db.get
query = self.db.query
@@ -3408,7 +3459,7 @@
def testArrayLiteral(self):
insert = self.db.insert
returns_arrays = pg.get_array()
- self.createTable('arraytest', 'i int[], t text[]', oids=True)
+ self.createTable('arraytest', 'i int[], t text[]')
r = dict(i=[1, 2, 3], t=['a', 'b', 'c'])
insert('arraytest', r)
if returns_arrays:
@@ -3439,26 +3490,23 @@
def testArrayOfIds(self):
array_on = pg.get_array()
- self.createTable('arraytest', 'c cid[], o oid[], x xid[]', oids=True)
+ self.createTable(
+ 'arraytest', 'i serial primary key, c cid[], o oid[], x xid[]')
r = self.db.get_attnames('arraytest')
if self.regtypes:
self.assertEqual(r, dict(
- oid='oid', c='cid[]', o='oid[]', x='xid[]'))
+ i='integer', c='cid[]', o='oid[]', x='xid[]'))
else:
self.assertEqual(r, dict(
- oid='int', c='int[]', o='int[]', x='int[]'))
- data = "" 12, 13], o=[21, 22, 23], x=[31, 32, 33])
+ i='int', c='int[]', o='int[]', x='int[]'))
+ data = "" c=[11, 12, 13], o=[21, 22, 23], x=[31, 32, 33])
r = data.copy()
self.db.insert('arraytest', r)
- qoid = 'oid(arraytest)'
- oid = r.pop(qoid)
if array_on:
self.assertEqual(r, data)
else:
self.assertEqual(r['o'], '{21,22,23}')
- r = {qoid: oid}
self.db.get('arraytest', r)
- self.assertEqual(oid, r.pop(qoid))
if array_on:
self.assertEqual(r, data)
else:
@@ -3466,7 +3514,7 @@
def testArrayOfText(self):
array_on = pg.get_array()
- self.createTable('arraytest', 'data text[]', oids=True)
+ self.createTable('arraytest', 'id serial primary key, data text[]')
r = self.db.get_attnames('arraytest')
self.assertEqual(r['data'], 'text[]')
data = "" World!', '', None, '{a,b,c}', '"Hi!"',
@@ -3490,7 +3538,7 @@
def testArrayOfBytea(self):
array_on = pg.get_array()
bytea_escaped = pg.get_bytea_escaped()
- self.createTable('arraytest', 'data bytea[]', oids=True)
+ self.createTable('arraytest', 'id serial primary key, data bytea[]')
r = self.db.get_attnames('arraytest')
self.assertEqual(r['data'], 'bytea[]')
data = "" World!', b'', None, b'{a,b,c}', b'"Hi!"',
@@ -3518,7 +3566,8 @@
def testArrayOfJson(self):
try:
- self.createTable('arraytest', 'data json[]', oids=True)
+ self.createTable(
+ 'arraytest', 'id serial primary key, data json[]')
except pg.ProgrammingError as error:
if self.db.server_version < 90200:
self.skipTest('database does not support json')
@@ -3565,7 +3614,8 @@
def testArrayOfJsonb(self):
try:
- self.createTable('arraytest', 'data jsonb[]', oids=True)
+ self.createTable(
+ 'arraytest', 'id serial primary key, data jsonb[]')
except pg.ProgrammingError as error:
if self.db.server_version < 90400:
self.skipTest('database does not support jsonb')
@@ -3612,7 +3662,8 @@
def testDeepArray(self):
array_on = pg.get_array()
- self.createTable('arraytest', 'data text[][][]', oids=True)
+ self.createTable(
+ 'arraytest', 'id serial primary key, data text[][][]')
r = self.db.get_attnames('arraytest')
self.assertEqual(r['data'], 'text[]')
data = "" World!', '{a,b,c}', 'back\\slash']]]
@@ -3635,11 +3686,12 @@
' (name varchar, age smallint, married bool,'
' weight real, salary money)')
self.addCleanup(query, 'drop type test_person_type')
- self.createTable('test_person', 'person test_person_type',
- temporary=False, oids=True)
+ self.createTable('test_person',
+ 'id serial primary key, person test_person_type',
+ oids=False, temporary=False)
attnames = self.db.get_attnames('test_person')
self.assertEqual(len(attnames), 2)
- self.assertIn('oid', attnames)
+ self.assertIn('id', attnames)
self.assertIn('person', attnames)
person_typ = attnames['person']
if self.regtypes:
@@ -3663,6 +3715,7 @@
t, f = 't', 'f'
person = ('John Doe', 61, t, 99.5, decimal('93456.75'))
r = self.db.insert('test_person', None, person=person)
+ self.assertEqual(r['id'], 1)
p = r['person']
self.assertIsInstance(p, tuple)
self.assertEqual(p, person)
@@ -3675,6 +3728,7 @@
person = ('Jane Roe', 59, f, 64.5, decimal('96543.25'))
r['person'] = person
self.db.update('test_person', r)
+ self.assertEqual(r['id'], 1)
p = r['person']
self.assertIsInstance(p, tuple)
self.assertEqual(p, person)
@@ -3686,6 +3740,7 @@
self.assertIsInstance(p.salary, decimal)
r['person'] = None
self.db.get('test_person', r)
+ self.assertEqual(r['id'], 1)
p = r['person']
self.assertIsInstance(p, tuple)
self.assertEqual(p, person)
@@ -3697,6 +3752,7 @@
self.assertIsInstance(p.salary, decimal)
person = (None,) * 5
r = self.db.insert('test_person', None, person=person)
+ self.assertEqual(r['id'], 2)
p = r['person']
self.assertIsInstance(p, tuple)
self.assertIsNone(p.name)
@@ -3706,6 +3762,7 @@
self.assertIsNone(p.salary)
r['person'] = None
self.db.get('test_person', r)
+ self.assertEqual(r['id'], 2)
p = r['person']
self.assertIsInstance(p, tuple)
self.assertIsNone(p.name)
@@ -3714,9 +3771,11 @@
self.assertIsNone(p.weight)
self.assertIsNone(p.salary)
r = self.db.insert('test_person', None, person=None)
+ self.assertEqual(r['id'], 3)
self.assertIsNone(r['person'])
r['person'] = None
self.db.get('test_person', r)
+ self.assertEqual(r['id'], 3)
self.assertIsNone(r['person'])
def testRecordInsertBytea(self):
@@ -3725,7 +3784,7 @@
' (name text, picture bytea)')
self.addCleanup(query, 'drop type test_person_type')
self.createTable('test_person', 'person test_person_type',
- temporary=False, oids=True)
+ temporary=False)
person_typ = self.db.get_attnames('test_person')['person']
self.assertEqual(person_typ.attnames,
dict(name='text', picture='bytea'))
@@ -3750,7 +3809,7 @@
self.fail(str(error))
self.addCleanup(query, 'drop type test_person_type')
self.createTable('test_person', 'person test_person_type',
- temporary=False, oids=True)
+ temporary=False)
person_typ = self.db.get_attnames('test_person')['person']
self.assertEqual(person_typ.attnames,
dict(name='text', data=''))
@@ -3772,7 +3831,7 @@
' (name varchar, age smallint)')
self.addCleanup(query, 'drop type test_person_type')
self.createTable('test_person', 'person test_person_type',
- temporary=False, oids=True)
+ temporary=False)
person_typ = self.db.get_attnames('test_person')['person']
if self.regtypes:
self.assertEqual(person_typ, 'test_person_type')
@@ -4010,12 +4069,12 @@
attnames = typ.attnames
self.assertIsInstance(attnames, dict)
self.assertIs(attnames, dbtypes.get_attnames('pg_type'))
- self.assertEqual(list(attnames)[0], 'typname')
+ self.assertIn('typname', attnames)
typname = attnames['typname']
self.assertEqual(typname, 'name' if self.regtypes else 'text')
self.assertEqual(typname.typtype, 'b') # base
self.assertEqual(typname.category, 'S') # string
- self.assertEqual(list(attnames)[3], 'typlen')
+ self.assertIn('typlen', attnames)
typlen = attnames['typlen']
self.assertEqual(typlen, 'smallint' if self.regtypes else 'int')
self.assertEqual(typlen.typtype, 'b') # base
@@ -4267,14 +4326,12 @@
self.assertRaises(TypeError, format_query,
'%s,%s', dict(i1=1, i2=2), dict(i1='int2'))
values = dict(i=3, f=7.5, t='hello', b=True)
- types = dict(i='int4', f='float4',
- t='text', b='bool')
+ types = dict(i='int4', f='float4', t='text', b='bool')
sql, params = format_query(
"select %(i)s,%(f)s,%(t)s,%(b)s", values, types)
self.assertEqual(sql, 'select $3,$2,$4,$1')
self.assertEqual(params, ['t', 7.5, 3, 'hello'])
- types = dict(i='bool', f='bool',
- t='bool', b='bool')
+ types = dict(i='bool', f='bool', t='bool', b='bool')
sql, params = format_query(
"select %(i)s,%(f)s,%(t)s,%(b)s", values, types)
self.assertEqual(sql, 'select $3,$2,$4,$1')
@@ -4437,6 +4494,7 @@
@classmethod
def setUpClass(cls):
db = DB()
+ cls.with_oids = "with oids" if db.server_version < 120000 else ""
query = db.query
for num_schema in range(5):
if num_schema:
@@ -4452,10 +4510,10 @@
schema = "public"
query("drop table if exists %s.t" % (schema,))
query("drop table if exists %s.t%d" % (schema, num_schema))
- query("create table %s.t with oids as select 1 as n, %d as d"
- % (schema, num_schema))
- query("create table %s.t%d with oids as select 1 as n, %d as d"
- % (schema, num_schema, num_schema))
+ query("create table %s.t %s as select 1 as n, %d as d"
+ % (schema, cls.with_oids, num_schema))
+ query("create table %s.t%d %s as select 1 as n, %d as d"
+ % (schema, num_schema, cls.with_oids, num_schema))
db.close()
cls.cls_set_up = True
@@ -4495,7 +4553,9 @@
def testGetAttnames(self):
get_attnames = self.db.get_attnames
query = self.db.query
- result = {'oid': 'int', 'd': 'int', 'n': 'int'}
+ result = {'d': 'int', 'n': 'int'}
+ if self.with_oids:
+ result['oid'] = 'int'
r = get_attnames("t")
self.assertEqual(r, result)
r = get_attnames("s4.t4")
@@ -4502,8 +4562,10 @@
self.assertEqual(r, result)
query("drop table if exists s3.t3m")
self.addCleanup(query, "drop table s3.t3m")
- query("create table s3.t3m with oids as select 1 as m")
- result_m = {'oid': 'int', 'm': 'int'}
+ query("create table s3.t3m %s as select 1 as m" % (self.with_oids,))
+ result_m = {'m': 'int'}
+ if self.with_oids:
+ result_m['oid'] = 'int'
r = get_attnames("s3.t3m")
self.assertEqual(r, result_m)
query("set search_path to s1,s3")
@@ -4540,13 +4602,22 @@
get = self.db.get
query = self.db.query
r = get("t", 1, 'n')
- self.assertIn('oid(t)', r)
+ if self.with_oids:
+ self.assertIn('oid(t)', r)
+ else:
+ self.assertNotIn('oid(t)', r)
query("set search_path to s2")
r = get("t2", 1, 'n')
- self.assertIn('oid(t2)', r)
+ if self.with_oids:
+ self.assertIn('oid(t2)', r)
+ else:
+ self.assertNotIn('oid(t2)', r)
query("set search_path to s3")
r = get("t", 1, 'n')
- self.assertIn('oid(t)', r)
+ if self.with_oids:
+ self.assertIn('oid(t)', r)
+ else:
+ self.assertNotIn('oid(t)', r)
class TestDebug(unittest.TestCase):