Author: cito
Date: Sun Jan 17 16:01:04 2016
New Revision: 763
Log:
Achieve 100% test coverage for pg module on the trunk
Note that some lines are only covered in certain Pg or Py versions,
so you need to run tests with different versions to be sure.
Also added another synonym for transaction methods,
you can now pick your favorite for all three of them.
Modified:
branches/4.x/docs/contents/pg/db_wrapper.rst
branches/4.x/pg.py
branches/4.x/tests/test_classic_dbwrapper.py
trunk/docs/contents/pg/db_wrapper.rst
trunk/pg.py
trunk/tests/test_classic_dbwrapper.py
Modified: branches/4.x/docs/contents/pg/db_wrapper.rst
==============================================================================
--- branches/4.x/docs/contents/pg/db_wrapper.rst Sun Jan 17 11:32:18
2016 (r762)
+++ branches/4.x/docs/contents/pg/db_wrapper.rst Sun Jan 17 16:01:04
2016 (r763)
@@ -257,6 +257,12 @@
.. versionadded:: 4.1
+.. method:: DB.abort()
+
+ This is the same as the :meth:`DB.rollback` method.
+
+.. versionadded:: 4.2
+
.. method:: DB.savepoint(name)
Define a new savepoint
Modified: branches/4.x/pg.py
==============================================================================
--- branches/4.x/pg.py Sun Jan 17 11:32:18 2016 (r762)
+++ branches/4.x/pg.py Sun Jan 17 16:01:04 2016 (r763)
@@ -519,6 +519,8 @@
qstr += ' TO ' + name
return self.query(qstr)
+ abort = rollback
+
def savepoint(self, name):
"""Define a new savepoint within the current transaction."""
return self.query('SAVEPOINT ' + name)
@@ -898,7 +900,7 @@
if keyname == 'oid':
if isinstance(arg, dict):
if qoid not in arg:
- raise _db_error('%s not in arg' % qoid)
+ raise _prg_error('%s not in arg' % qoid)
else:
arg = {qoid: arg}
where = 'oid = %s' % arg[qoid]
Modified: branches/4.x/tests/test_classic_dbwrapper.py
==============================================================================
--- branches/4.x/tests/test_classic_dbwrapper.py Sun Jan 17 11:32:18
2016 (r762)
+++ branches/4.x/tests/test_classic_dbwrapper.py Sun Jan 17 16:01:04
2016 (r763)
@@ -71,6 +71,7 @@
def testAllDBAttributes(self):
attributes = [
+ 'abort',
'begin',
'cancel', 'clear', 'close', 'commit',
'db', 'dbname', 'debug', 'delete',
@@ -222,8 +223,12 @@
pass
else:
self.fail('Reset should give an error for a closed connection')
+ self.assertIsNone(self.db.db)
self.assertRaises(pg.InternalError, self.db.close)
self.assertRaises(pg.InternalError, self.db.query, 'select 1')
+ self.assertRaises(pg.InternalError, getattr, self.db, 'status')
+ self.assertRaises(pg.InternalError, getattr, self.db, 'error')
+ self.assertRaises(pg.InternalError, getattr, self.db, 'absent')
def testMethodReset(self):
con = self.db.db
@@ -436,6 +441,9 @@
self.assertRaises(TypeError, f)
self.assertRaises(TypeError, f, None)
self.assertRaises(TypeError, f, 42)
+ self.assertRaises(TypeError, f, '')
+ self.assertRaises(TypeError, f, [])
+ self.assertRaises(TypeError, f, [''])
self.assertRaises(pg.ProgrammingError, f, 'this_does_not_exist')
r = f('standard_conforming_strings')
self.assertEqual(r, 'on')
@@ -487,6 +495,12 @@
self.assertRaises(TypeError, f)
self.assertRaises(TypeError, f, None)
self.assertRaises(TypeError, f, 42)
+ self.assertRaises(TypeError, f, '')
+ self.assertRaises(TypeError, f, [])
+ self.assertRaises(TypeError, f, [''])
+ self.assertRaises(ValueError, f, 'all', 'invalid')
+ self.assertRaises(ValueError, f, {
+ 'invalid1': 'value1', 'invalid2': 'value2'}, 'value')
self.assertRaises(pg.ProgrammingError, f, 'this_does_not_exist')
f('standard_conforming_strings', 'off')
self.assertEqual(g('standard_conforming_strings'), 'off')
@@ -770,6 +784,12 @@
self.db.get_attnames, 'does_not_exist')
self.assertRaises(pg.ProgrammingError,
self.db.get_attnames, 'has.too.many.dots')
+ attributes = self.db.get_attnames('test')
+ self.assertIsInstance(attributes, dict)
+ self.assertEqual(attributes, dict(
+ i2='int', i4='int', i8='int', d='num',
+ f4='float', f8='float', m='money',
+ v4='text', c4='text', t='text'))
for table in ('attnames_test_table', 'test table for attnames'):
self.db.query('drop table if exists "%s"' % table)
self.db.query('create table "%s" ('
Modified: trunk/docs/contents/pg/db_wrapper.rst
==============================================================================
--- trunk/docs/contents/pg/db_wrapper.rst Sun Jan 17 11:32:18 2016
(r762)
+++ trunk/docs/contents/pg/db_wrapper.rst Sun Jan 17 16:01:04 2016
(r763)
@@ -257,7 +257,11 @@
This rolls back the current transaction and causes all the updates
made by the transaction to be discarded.
-.. versionadded:: 4.1
+.. method:: DB.abort()
+
+ This is the same as the :meth:`DB.rollback` method.
+
+.. versionadded:: 4.2
.. method:: DB.savepoint(name)
Modified: trunk/pg.py
==============================================================================
--- trunk/pg.py Sun Jan 17 11:32:18 2016 (r762)
+++ trunk/pg.py Sun Jan 17 16:01:04 2016 (r763)
@@ -373,15 +373,11 @@
def _prepare_param(self, value, typ, params):
"""Prepare and add a parameter to the list."""
if value is not None and typ != 'text':
+ prepare = self._prepare_funcs[typ]
try:
- prepare = self._prepare_funcs[typ]
- except KeyError:
- pass
- else:
- try:
- value = prepare(self, value)
- except ValueError:
- return value
+ value = prepare(self, value)
+ except ValueError:
+ return value
params.append(value)
return '$%d' % len(params)
@@ -469,6 +465,8 @@
qstr += ' TO ' + name
return self.query(qstr)
+ abort = rollback
+
def savepoint(self, name):
"""Define a new savepoint within the current transaction."""
return self.query('SAVEPOINT ' + name)
@@ -723,8 +721,6 @@
'::regtype' if self._regtypes else '',
self._prepare_qualified_param(table, 1))
names = self.db.query(q, (table,)).getresult()
- if not names:
- raise KeyError('Table %s does not exist' % table)
if not self._regtypes:
names = ((name, _simpletype(typ)) for name, typ in names)
names = OrderedDict(names)
@@ -787,7 +783,7 @@
if keyname == 'oid':
if isinstance(row, dict):
if qoid not in row:
- raise _db_error('%s not in row' % qoid)
+ raise _prg_error('%s not in row' % qoid)
else:
row = {qoid: row}
what = '*'
@@ -854,9 +850,7 @@
self._escape_qualified_name(table), names, values, ret)
self._do_debug(q, params)
q = self.db.query(q, params)
- res = q.dictresult()
- if not res:
- raise _int_error('Insert operation did not return new values')
+ res = q.dictresult() # this will always return a row
for n, value in res[0].items():
if n == 'oid':
n = _oid_key(table)
@@ -879,8 +873,7 @@
# Note that we only accept oid key from named args for safety.
qoid = _oid_key(table)
if 'oid' in kw:
- kw[qoid] = kw['oid']
- del kw['oid']
+ kw[qoid] = kw.pop('oid')
if row is None:
row = {}
row.update(kw)
@@ -990,10 +983,7 @@
raise _prg_error('Table %s has no primary key' % table)
keyname = [keyname] if isinstance(
keyname, basestring) else sorted(keyname)
- try:
- target = ', '.join(col(k) for k in keyname)
- except KeyError:
- raise _prg_error('Upsert operation needs primary key or oid')
+ target = ', '.join(col(k) for k in keyname)
update = []
keyname = set(keyname)
keyname.add('oid')
@@ -1004,7 +994,7 @@
if not isinstance(value, basestring):
value = 'excluded.%s' % col(n)
update.append('%s = %s' % (col(n), value))
- if not values and not update:
+ if not values:
return row
do = 'update set %s' % ', '.join(update) if update else 'nothing'
ret = 'oid, *' if 'oid' in attnames else '*'
@@ -1021,15 +1011,13 @@
'Upsert operation is not supported by PostgreSQL version')
raise # re-raise original error
res = q.dictresult()
- if res: # may be empty with "do nothing"
+ if update: # may be empty with "do nothing"
for n, value in res[0].items():
if n == 'oid':
n = _oid_key(table)
- elif attnames.get(n) == 'bytea':
+ elif attnames.get(n) == 'bytea' and value is not None:
value = self.unescape_bytea(value)
row[n] = value
- elif update:
- raise _int_error('Upsert operation did not return new values')
else:
self.get(table, row)
return row
@@ -1073,8 +1061,7 @@
# Note that we only accept oid key from named args for safety.
qoid = _oid_key(table)
if 'oid' in kw:
- kw[qoid] = kw['oid']
- del kw['oid']
+ kw[qoid] = kw.pop('oid')
if row is None:
row = {}
row.update(kw)
Modified: trunk/tests/test_classic_dbwrapper.py
==============================================================================
--- trunk/tests/test_classic_dbwrapper.py Sun Jan 17 11:32:18 2016
(r762)
+++ trunk/tests/test_classic_dbwrapper.py Sun Jan 17 16:01:04 2016
(r763)
@@ -92,6 +92,7 @@
def testAllDBAttributes(self):
attributes = [
+ 'abort',
'begin',
'cancel', 'clear', 'close', 'commit',
'db', 'dbname', 'debug', 'delete',
@@ -229,8 +230,12 @@
pass
else:
self.fail('Reset should give an error for a closed connection')
+ self.assertIsNone(self.db.db)
self.assertRaises(pg.InternalError, self.db.close)
self.assertRaises(pg.InternalError, self.db.query, 'select 1')
+ self.assertRaises(pg.InternalError, getattr, self.db, 'status')
+ self.assertRaises(pg.InternalError, getattr, self.db, 'error')
+ self.assertRaises(pg.InternalError, getattr, self.db, 'absent')
def testMethodReset(self):
con = self.db.db
@@ -415,6 +420,9 @@
self.assertRaises(TypeError, f)
self.assertRaises(TypeError, f, None)
self.assertRaises(TypeError, f, 42)
+ self.assertRaises(TypeError, f, '')
+ self.assertRaises(TypeError, f, [])
+ self.assertRaises(TypeError, f, [''])
self.assertRaises(pg.ProgrammingError, f, 'this_does_not_exist')
r = f('standard_conforming_strings')
self.assertEqual(r, 'on')
@@ -466,6 +474,12 @@
self.assertRaises(TypeError, f)
self.assertRaises(TypeError, f, None)
self.assertRaises(TypeError, f, 42)
+ self.assertRaises(TypeError, f, '')
+ self.assertRaises(TypeError, f, [])
+ self.assertRaises(TypeError, f, [''])
+ self.assertRaises(ValueError, f, 'all', 'invalid')
+ self.assertRaises(ValueError, f, {
+ 'invalid1': 'value1', 'invalid2': 'value2'}, 'value')
self.assertRaises(pg.ProgrammingError, f, 'this_does_not_exist')
f('standard_conforming_strings', 'off')
self.assertEqual(g('standard_conforming_strings'), 'off')
@@ -809,6 +823,12 @@
self.db.get_attnames, 'does_not_exist')
self.assertRaises(pg.ProgrammingError,
self.db.get_attnames, 'has.too.many.dots')
+ r = get_attnames('test')
+ self.assertIsInstance(r, dict)
+ self.assertEqual(r, dict(
+ i2='int', i4='int', i8='int', d='num',
+ f4='float', f8='float', m='money',
+ v4='text', c4='text', t='text'))
query = self.db.query
query("drop table if exists test_table")
self.addCleanup(query, "drop table test_table")
@@ -860,12 +880,15 @@
query("create table test_table("
" n int, alpha smallint, beta bool,"
" gamma char(5), tau text, v varchar(3))")
- self.db.use_regtypes(True)
+ use_regtypes = self.db.use_regtypes
+ regtypes = use_regtypes()
+ self.assertFalse(regtypes)
+ use_regtypes(True)
try:
r = get_attnames("test_table")
self.assertIsInstance(r, dict)
finally:
- self.db.use_regtypes(False)
+ use_regtypes(regtypes)
self.assertEqual(r, dict(
n='integer', alpha='smallint', beta='boolean',
gamma='character', tau='text', v='character varying'))
@@ -874,22 +897,27 @@
get_attnames = self.db.get_attnames
query = self.db.query
query("drop table if exists test_table")
- self.addCleanup(query, "drop table if exists test_table")
+ self.addCleanup(query, "drop table test_table")
query("create table test_table(col int)")
r = get_attnames("test_table")
self.assertIsInstance(r, dict)
self.assertEqual(r, dict(col='int'))
- query("drop table test_table")
- query("create table test_table(col text)")
+ query("alter table test_table alter column col type text")
+ query("alter table test_table add column col2 int")
r = get_attnames("test_table")
self.assertEqual(r, dict(col='int'))
r = get_attnames("test_table", flush=True)
+ self.assertEqual(r, dict(col='text', col2='int'))
+ query("alter table test_table drop column col2")
+ r = get_attnames("test_table")
+ self.assertEqual(r, dict(col='text', col2='int'))
+ r = get_attnames("test_table", flush=True)
self.assertEqual(r, dict(col='text'))
- query("drop table test_table")
+ query("alter table test_table drop column col")
r = get_attnames("test_table")
self.assertEqual(r, dict(col='text'))
- self.assertRaises(pg.ProgrammingError,
- get_attnames, "test_table", flush=True)
+ r = get_attnames("test_table", flush=True)
+ self.assertEqual(r, dict())
def testGetAttnamesIsOrdered(self):
get_attnames = self.db.get_attnames
@@ -935,6 +963,7 @@
query('insert into "%s" values('"%d, '%s')"
% (table, n + 1, t))
self.assertRaises(pg.ProgrammingError, get, table, 2)
+ self.assertRaises(pg.ProgrammingError, get, table, {}, 'oid')
r = get(table, 2, 'n')
oid_table = 'oid(%s)' % table
self.assertIn(oid_table, r)
@@ -1160,6 +1189,31 @@
self.assertEqual(data, expect)
query('delete from "%s"' % table)
+ def testInsertWithOid(self):
+ insert = self.db.insert
+ query = self.db.query
+ query("drop table if exists test_table")
+ self.addCleanup(query, "drop table test_table")
+ query("create table test_table (n int) with oids")
+ r = insert('test_table', n=1)
+ self.assertIsInstance(r, dict)
+ self.assertEqual(r['n'], 1)
+ qoid = 'oid(test_table)'
+ self.assertIn(qoid, r)
+ r = insert('test_table', n=2, oid='invalid')
+ self.assertIsInstance(r, dict)
+ self.assertEqual(r['n'], 2)
+ r['n'] = 3
+ r = insert('test_table', r)
+ self.assertIsInstance(r, dict)
+ self.assertEqual(r['n'], 3)
+ r = insert('test_table', r, n=4)
+ self.assertIsInstance(r, dict)
+ self.assertEqual(r['n'], 4)
+ q = 'select n from test_table order by 1 limit 5'
+ r = query(q).getresult()
+ self.assertEqual(r, [(1,), (2,), (3,), (4,)])
+
def testInsertWithQuotedNames(self):
insert = self.db.insert
query = self.db.query
@@ -1185,6 +1239,8 @@
def testUpdate(self):
update = self.db.update
query = self.db.query
+ self.assertRaises(pg.ProgrammingError, update,
+ 'test', i2=2, i4=4, i8=8)
table = 'update_test_table'
query('drop table if exists "%s"' % table)
self.addCleanup(query, 'drop table "%s"' % table)
@@ -1202,6 +1258,40 @@
r = query(q).getresult()[0][0]
self.assertEqual(r, 'u')
+ def testUpdateWithOid(self):
+ update = self.db.update
+ get = self.db.get
+ query = self.db.query
+ query("drop table if exists test_table")
+ self.addCleanup(query, "drop table test_table")
+ query("create table test_table (n int) with oids")
+ query("insert into test_table values (1)")
+ r = get('test_table', 1, 'n')
+ self.assertIsInstance(r, dict)
+ self.assertEqual(r['n'], 1)
+ r['n'] = 2
+ r = update('test_table', r)
+ self.assertIsInstance(r, dict)
+ self.assertEqual(r['n'], 2)
+ qoid = 'oid(test_table)'
+ self.assertIn(qoid, r)
+ r['n'] = 3
+ r = update('test_table', r, oid=r.pop(qoid))
+ self.assertIsInstance(r, dict)
+ self.assertEqual(r['n'], 3)
+ r.pop(qoid)
+ self.assertRaises(pg.ProgrammingError, update, 'test_table', r)
+ r = get('test_table', 3, 'n')
+ self.assertIsInstance(r, dict)
+ self.assertEqual(r['n'], 3)
+ r.pop('n')
+ r = update('test_table', r)
+ r.pop(qoid)
+ self.assertEqual(r, {})
+ q = 'select n from test_table limit 2'
+ r = query(q).getresult()
+ self.assertEqual(r, [(3,)])
+
def testUpdateWithCompositeKey(self):
update = self.db.update
query = self.db.query
@@ -1278,6 +1368,8 @@
def testUpsert(self):
upsert = self.db.upsert
query = self.db.query
+ self.assertRaises(pg.ProgrammingError, upsert,
+ 'test', i2=2, i4=4, i8=8)
table = 'upsert_test_table'
query('drop table if exists "%s"' % table)
self.addCleanup(query, 'drop table "%s"' % table)
@@ -1343,6 +1435,10 @@
self.assertEqual(r['t'], 'x2')
r = query(q).getresult()
self.assertEqual(r, [(1, 'x2'), (2, 'y3')])
+ # not existing columns and oid parameter should be ignored
+ s = dict(m=3, u='z')
+ r = upsert(table, s, oid='invalid')
+ self.assertIs(r, s)
def testUpsertWithCompositeKey(self):
upsert = self.db.upsert
@@ -1449,21 +1545,24 @@
clear = self.db.clear
query = self.db.query
f = False if pg.get_bool() else 'f'
+ r = clear('test')
+ result = dict(
+ i2=0, i4=0, i8=0, d=0, f4=0, f8=0, m=0, v4='', c4='', t='')
+ self.assertEqual(r, result)
table = 'clear_test_table'
query('drop table if exists "%s"' % table)
self.addCleanup(query, 'drop table "%s"' % table)
query('create table "%s" ('
- "n integer, b boolean, d date, t text)" % table)
+ "n integer, b boolean, d date, t text) with oids" % table)
r = clear(table)
- result = {'n': 0, 'b': f, 'd': '', 't': ''}
+ result = dict(n=0, b=f, d='', t='')
self.assertEqual(r, result)
r['a'] = r['n'] = 1
r['d'] = r['t'] = 'x'
r['b'] = 't'
r['oid'] = long(1)
r = clear(table, r)
- result = {'a': 1, 'n': 0, 'b': f, 'd': '', 't': '',
- 'oid': long(1)}
+ result = dict(a=1, n=0, b=f, d='', t='', oid=long(1))
self.assertEqual(r, result)
def testClearWithQuotedNames(self):
@@ -1484,6 +1583,8 @@
def testDelete(self):
delete = self.db.delete
query = self.db.query
+ self.assertRaises(pg.ProgrammingError, delete,
+ 'test', dict(i2=2, i4=4, i8=8))
table = 'delete_test_table'
query('drop table if exists "%s"' % table)
self.addCleanup(query, 'drop table "%s"' % table)
@@ -1512,6 +1613,52 @@
s = delete(table, r)
self.assertEqual(s, 0)
self.assertRaises(pg.DatabaseError, self.db.get, table, 2, 'n')
+ # not existing columns and oid parameter should be ignored
+ r.update(m=3, u='z', oid='invalid')
+ s = delete(table, r)
+ self.assertEqual(s, 0)
+
+ def testDeleteWithOid(self):
+ delete = self.db.delete
+ get = self.db.get
+ query = self.db.query
+ query("drop table if exists test_table")
+ self.addCleanup(query, "drop table test_table")
+ query("create table test_table (n int) with oids")
+ query("insert into test_table values (1)")
+ query("insert into test_table values (2)")
+ query("insert into test_table values (3)")
+ r = dict(n=3)
+ self.assertRaises(pg.ProgrammingError, delete, 'test_table', r)
+ r = get('test_table', 1, 'n')
+ self.assertIsInstance(r, dict)
+ self.assertEqual(r['n'], 1)
+ qoid = 'oid(test_table)'
+ self.assertIn(qoid, r)
+ oid = r[qoid]
+ self.assertIsInstance(oid, int)
+ s = delete('test_table', r)
+ self.assertEqual(s, 1)
+ s = delete('test_table', r)
+ self.assertEqual(s, 0)
+ r = get('test_table', 2, 'n')
+ self.assertIsInstance(r, dict)
+ self.assertEqual(r['n'], 2)
+ qoid = 'oid(test_table)'
+ self.assertIn(qoid, r)
+ oid = r[qoid]
+ self.assertIsInstance(oid, int)
+ r['oid'] = r.pop(qoid)
+ self.assertRaises(pg.ProgrammingError, delete, 'test_table', r)
+ s = delete('test_table', r, oid=oid)
+ self.assertEqual(s, 1)
+ s = delete('test_table', r)
+ self.assertEqual(s, 0)
+ s = delete('test_table', r, n=3)
+ self.assertEqual(s, 0)
+ q = 'select n from test_table order by 1 limit 3'
+ r = query(q).getresult()
+ self.assertEqual(r, [(3,)])
def testDeleteWithCompositeKey(self):
query = self.db.query
@@ -1814,6 +1961,19 @@
r = [r[0] for r in query(
"select * from test_table order by 1").getresult()]
self.assertEqual(r, [1, 2, 5, 7, 9])
+ self.db.begin(mode='read only')
+ self.assertRaises(pg.ProgrammingError,
+ query, "insert into test_table values (0)")
+ self.db.rollback()
+ self.db.start(mode='Read Only')
+ self.assertRaises(pg.ProgrammingError,
+ query, "insert into test_table values (0)")
+ self.db.abort()
+
+ def testTransactionAliases(self):
+ self.assertEqual(self.db.begin, self.db.start)
+ self.assertEqual(self.db.commit, self.db.end)
+ self.assertEqual(self.db.rollback, self.db.abort)
def testContextManager(self):
query = self.db.query
@@ -1925,6 +2085,33 @@
self.assertIsInstance(r, bytes)
self.assertEqual(r, s)
+ def testUpsertBytea(self):
+ query = self.db.query
+ query('drop table if exists bytea_test')
+ self.addCleanup(query, 'drop table bytea_test')
+ query('create table bytea_test (n smallint primary key, data bytea)')
+ s = b"It's all \\ kinds \x00 of\r nasty \xff stuff!\n"
+ r = dict(n=7, data=s)
+ try:
+ r = self.db.upsert('bytea_test', r)
+ except pg.ProgrammingError as error:
+ if self.db.server_version < 90500:
+ self.skipTest('database does not support upsert')
+ self.fail(str(error))
+ self.assertIsInstance(r, dict)
+ self.assertIn('n', r)
+ self.assertEqual(r['n'], 7)
+ self.assertIn('data', r)
+ self.assertIsInstance(r['data'], bytes)
+ self.assertEqual(r['data'], s)
+ r['data'] = None
+ r = self.db.upsert('bytea_test', r)
+ self.assertIsInstance(r, dict)
+ self.assertIn('n', r)
+ self.assertEqual(r['n'], 7)
+ self.assertIn('data', r)
+ self.assertIsNone(r['data'], bytes)
+
def testNotificationHandler(self):
# the notification handler itself is tested separately
f = self.db.notification_handler
_______________________________________________
PyGreSQL mailing list
[email protected]
https://mail.vex.net/mailman/listinfo.cgi/pygresql