Author: cito
Date: Thu Jan 14 15:26:30 2016
New Revision: 744
Log:
Use cleanup feature in unittests
Modified:
trunk/tests/test_classic_connection.py
trunk/tests/test_classic_dbwrapper.py
Modified: trunk/tests/test_classic_connection.py
==============================================================================
--- trunk/tests/test_classic_connection.py Thu Jan 14 12:32:01 2016
(r743)
+++ trunk/tests/test_classic_connection.py Thu Jan 14 15:26:30 2016
(r744)
@@ -264,6 +264,7 @@
self.c = connect()
def tearDown(self):
+ self.doCleanups()
self.c.close()
def testClassName(self):
@@ -506,6 +507,7 @@
def testQuery(self):
query = self.c.query
query("drop table if exists test_table")
+ self.addCleanup(query, "drop table test_table")
q = "create table test_table (n integer) with oids"
r = query(q)
self.assertIsNone(r)
@@ -536,7 +538,6 @@
r = query(q)
self.assertIsInstance(r, str)
self.assertEqual(r, '5')
- query("drop table test_table")
class TestUnicodeQueries(unittest.TestCase):
@@ -1204,6 +1205,7 @@
self.c = connect()
def tearDown(self):
+ self.doCleanups()
self.c.close()
def testGetNotify(self):
@@ -1250,31 +1252,29 @@
self.assertIs(self.c.get_notice_receiver(), r)
def testNoticeReceiver(self):
+ self.addCleanup(self.c.query, 'drop function bilbo_notice();')
self.c.query('''create function bilbo_notice() returns void AS $$
begin
raise warning 'Bilbo was here!';
end;
$$ language plpgsql''')
- try:
- received = {}
+ received = {}
- def notice_receiver(notice):
- for attr in dir(notice):
- if attr.startswith('__'):
- continue
- value = getattr(notice, attr)
- if isinstance(value, str):
- value = value.replace('WARNUNG', 'WARNING')
- received[attr] = value
-
- self.c.set_notice_receiver(notice_receiver)
- self.c.query('''select bilbo_notice()''')
- self.assertEqual(received, dict(
- pgcnx=self.c, message='WARNING: Bilbo was here!\n',
- severity='WARNING', primary='Bilbo was here!',
- detail=None, hint=None))
- finally:
- self.c.query('''drop function bilbo_notice();''')
+ def notice_receiver(notice):
+ for attr in dir(notice):
+ if attr.startswith('__'):
+ continue
+ value = getattr(notice, attr)
+ if isinstance(value, str):
+ value = value.replace('WARNUNG', 'WARNING')
+ received[attr] = value
+
+ self.c.set_notice_receiver(notice_receiver)
+ self.c.query('select bilbo_notice()')
+ self.assertEqual(received, dict(
+ pgcnx=self.c, message='WARNING: Bilbo was here!\n',
+ severity='WARNING', primary='Bilbo was here!',
+ detail=None, hint=None))
class TestConfigFunctions(unittest.TestCase):
Modified: trunk/tests/test_classic_dbwrapper.py
==============================================================================
--- trunk/tests/test_classic_dbwrapper.py Thu Jan 14 12:32:01 2016
(r743)
+++ trunk/tests/test_classic_dbwrapper.py Thu Jan 14 15:26:30 2016
(r744)
@@ -309,6 +309,7 @@
query('set bytea_output=hex')
def tearDown(self):
+ self.doCleanups()
self.db.close()
def testClassName(self):
@@ -415,6 +416,7 @@
get_attnames = self.db.get_attnames
query = self.db.query
query("drop table if exists test_table")
+ self.addCleanup(query, "drop table test_table")
query("create table test_table("
" n int, alpha smallint, beta bool,"
" gamma char(5), tau text, v varchar(3))")
@@ -423,13 +425,13 @@
self.assertEquals(r, dict(
n='int', alpha='int', beta='bool',
gamma='text', tau='text', v='text'))
- query("drop table test_table")
def testGetAttnamesWithQuotes(self):
get_attnames = self.db.get_attnames
query = self.db.query
table = 'test table for get_attnames()'
query('drop table if exists "%s"' % table)
+ self.addCleanup(query, 'drop table "%s"' % table)
query('create table "%s"('
'"Prime!" smallint,'
'"much space" integer, "Questions?" text)' % table)
@@ -437,12 +439,12 @@
self.assertIsInstance(r, dict)
self.assertEquals(r, {
'Prime!': 'int', 'much space': 'int', 'Questions?': 'text'})
- query('drop table "%s"' % table)
def testGetAttnamesWithRegtypes(self):
get_attnames = self.db.get_attnames
query = self.db.query
query("drop table if exists test_table")
+ self.addCleanup(query, "drop table test_table")
query("create table test_table("
" n int, alpha smallint, beta bool,"
" gamma char(5), tau text, v varchar(3))")
@@ -455,12 +457,12 @@
self.assertEquals(r, dict(
n='integer', alpha='smallint', beta='boolean',
gamma='character', tau='text', v='character varying'))
- query("drop table test_table")
def testGetAttnamesIsCached(self):
get_attnames = self.db.get_attnames
query = self.db.query
query("drop table if exists test_table")
+ self.addCleanup(query, "drop table if exists test_table")
query("create table test_table(col int)")
r = get_attnames("test_table")
self.assertIsInstance(r, dict)
@@ -481,6 +483,7 @@
get_attnames = self.db.get_attnames
query = self.db.query
query("drop table if exists test_table")
+ self.addCleanup(query, "drop table test_table")
query("create table test_table("
" n int, alpha smallint, v varchar(3),"
" gamma char(5), tau text, beta bool)")
@@ -489,7 +492,6 @@
self.assertEquals(r, OrderedDict([
('n', 'int'), ('alpha', 'int'), ('v', 'text'),
('gamma', 'text'), ('tau', 'text'), ('beta', 'bool')]))
- query("drop table test_table")
if OrderedDict is dict:
self.skipTest('OrderedDict is not supported')
r = ' '.join(list(r.keys()))
@@ -498,6 +500,7 @@
def testQuery(self):
query = self.db.query
query("drop table if exists test_table")
+ self.addCleanup(query, "drop table test_table")
q = "create table test_table (n integer) with oids"
r = query(q)
self.assertIsNone(r)
@@ -527,7 +530,6 @@
r = query(q)
self.assertIsInstance(r, str)
self.assertEqual(r, '5')
- query("drop table test_table")
def testMultipleQueries(self):
self.assertEqual(self.db.query(
@@ -538,6 +540,7 @@
def testQueryWithParams(self):
query = self.db.query
query("drop table if exists test_table")
+ self.addCleanup(query, "drop table test_table")
q = "create table test_table (n1 integer, n2 integer) with oids"
query(q)
q = "insert into test_table values ($1, $2)"
@@ -561,7 +564,6 @@
q = "delete from test_table where n2!=$1"
r = query(q, 4)
self.assertEqual(r, '3')
- query("drop table test_table")
def testEmptyQuery(self):
self.assertRaises(ValueError, self.db.query, '')
@@ -578,6 +580,7 @@
for t in ('pkeytest', 'primary key test'):
for n in range(7):
query('drop table if exists "%s%d"' % (t, n))
+ self.addCleanup(query, 'drop table "%s%d"' % (t, n))
query('create table "%s0" ('
"a smallint)" % t)
query('create table "%s1" ('
@@ -619,8 +622,6 @@
self.assertEqual(pkey('%s1' % t), 'b')
# we get the changed primary key when the cache is flushed
self.assertEqual(pkey('%s1' % t, flush=True), 'x')
- for n in range(7):
- query('drop table "%s%d"' % (t, n))
def testGetDatabases(self):
databases = self.db.get_databases()
@@ -692,6 +693,7 @@
self.db.get_attnames, 'has.too.many.dots')
for table in ('attnames_test_table', 'test table for attnames'):
self.db.query('drop table if exists "%s"' % table)
+ self.addCleanup(self.db.query, 'drop table "%s"' % table)
self.db.query('create table "%s" ('
'a smallint, b integer, c bigint, '
'e numeric, f float, f2 double precision, m money, '
@@ -706,7 +708,6 @@
'u': 'text', 't': 'text', 'v': 'text',
'y': 'int', 'x': 'int', 'z': 'int', 'oid': 'int'}
self.assertEqual(attributes, result)
- self.db.query('drop table "%s"' % table)
def testHasTablePrivilege(self):
can = self.db.has_table_privilege
@@ -727,6 +728,7 @@
query = self.db.query
table = 'get_test_table'
query('drop table if exists "%s"' % table)
+ self.addCleanup(query, 'drop table "%s"' % table)
query('create table "%s" ('
"n integer, t text) with oids" % table)
for n, t in enumerate('xyz'):
@@ -760,22 +762,22 @@
self.assertEqual(get(table, r)['t'], 'z')
r['n'] = 2
self.assertEqual(get(table, r)['t'], 'y')
- query('drop table "%s"' % table)
def testGetWithCompositeKey(self):
get = self.db.get
query = self.db.query
table = 'get_test_table_1'
query('drop table if exists "%s"' % table)
+ self.addCleanup(query, 'drop table "%s"' % table)
query('create table "%s" ('
"n integer, t text, primary key (n))" % table)
for n, t in enumerate('abc'):
query('insert into "%s" values('
"%d, '%s')" % (table, n + 1, t))
self.assertEqual(get(table, 2)['t'], 'b')
- query('drop table "%s"' % table)
table = 'get_test_table_2'
query('drop table if exists "%s"' % table)
+ self.addCleanup(query, 'drop table "%s"' % table)
query('create table "%s" ('
"n integer, m integer, t text, primary key (n, m))" % table)
for n in range(3):
@@ -789,13 +791,13 @@
self.assertEqual(r['t'], 'b')
r = get(table, dict(n=3, m=2), frozenset(['n', 'm']))
self.assertEqual(r['t'], 'f')
- query('drop table "%s"' % table)
def testGetWithQuotedNames(self):
get = self.db.get
query = self.db.query
table = 'test table for get()'
query('drop table if exists "%s"' % table)
+ self.addCleanup(query, 'drop table "%s"' % table)
query('create table "%s" ('
'"Prime!" smallint primary key,'
'"much space" integer, "Questions?" text)' % table)
@@ -806,7 +808,6 @@
self.assertEqual(r['Prime!'], 17)
self.assertEqual(r['much space'], 1001)
self.assertEqual(r['Questions?'], 'No!')
- query('drop table "%s"' % table)
def testGetFromView(self):
self.db.query('delete from test where i4=14')
@@ -820,6 +821,7 @@
get = self.db.get
query = self.db.query
query("drop table if exists test_students")
+ self.addCleanup(query, "drop table test_students")
query("create table test_students (firstname varchar primary key,"
" nickname varchar, grade char(2))")
query("insert into test_students values ("
@@ -853,7 +855,6 @@
r = query(q).getresult()
self.assertEqual(len(r), 3)
self.assertEqual(r[1][2], 'D-')
- query('drop table test_students')
def testInsert(self):
insert = self.db.insert
@@ -862,6 +863,7 @@
decimal = pg.get_decimal()
table = 'insert_test_table'
query('drop table if exists "%s"' % table)
+ self.addCleanup(query, 'drop table "%s"' % table)
query('create table "%s" ('
"i2 smallint, i4 integer, i8 bigint,"
" d numeric, f4 real, f8 double precision, m money,"
@@ -957,13 +959,13 @@
if item[0] in expect)
self.assertEqual(data, expect)
query('delete from "%s"' % table)
- query('drop table "%s"' % table)
def testInsertWithQuotedNames(self):
insert = self.db.insert
query = self.db.query
table = 'test table for insert()'
query('drop table if exists "%s"' % table)
+ self.addCleanup(query, 'drop table "%s"' % table)
query('create table "%s" ('
'"Prime!" smallint primary key,'
'"much space" integer, "Questions?" text)' % table)
@@ -979,13 +981,13 @@
self.assertEqual(r['Prime!'], 11)
self.assertEqual(r['much space'], 2002)
self.assertEqual(r['Questions?'], 'What?')
- query('drop table "%s"' % table)
def testUpdate(self):
update = self.db.update
query = self.db.query
table = 'update_test_table'
query('drop table if exists "%s"' % table)
+ self.addCleanup(query, 'drop table "%s"' % table)
query('create table "%s" ('
"n integer, t text) with oids" % table)
for n, t in enumerate('xyz'):
@@ -999,13 +1001,13 @@
q = 'select t from "%s" where n=2' % table
r = query(q).getresult()[0][0]
self.assertEqual(r, 'u')
- query('drop table "%s"' % table)
def testUpdateWithCompositeKey(self):
update = self.db.update
query = self.db.query
table = 'update_test_table_1'
query('drop table if exists "%s"' % table)
+ self.addCleanup(query, 'drop table if exists "%s"' % table)
query('create table "%s" ('
"n integer, t text, primary key (n))" % table)
for n, t in enumerate('abc'):
@@ -1048,13 +1050,13 @@
q = 'select t from "%s" where n=2 order by m' % table
r = [r[0] for r in query(q).getresult()]
self.assertEqual(r, ['c', 'x'])
- query('drop table "%s"' % table)
def testUpdateWithQuotedNames(self):
update = self.db.update
query = self.db.query
table = 'test table for update()'
query('drop table if exists "%s"' % table)
+ self.addCleanup(query, 'drop table "%s"' % table)
query('create table "%s" ('
'"Prime!" smallint primary key,'
'"much space" integer, "Questions?" text)' % table)
@@ -1072,13 +1074,13 @@
self.assertEqual(r['Prime!'], 13)
self.assertEqual(r['much space'], 7007)
self.assertEqual(r['Questions?'], 'When?')
- query('drop table "%s"' % table)
def testUpsert(self):
upsert = self.db.upsert
query = self.db.query
table = 'upsert_test_table'
query('drop table if exists "%s"' % table)
+ self.addCleanup(query, 'drop table "%s"' % table)
query('create table "%s" ('
"n integer primary key, t text) with oids" % table)
s = dict(n=1, t='x')
@@ -1141,13 +1143,13 @@
self.assertEqual(r['t'], 'x2')
r = query(q).getresult()
self.assertEqual(r, [(1, 'x2'), (2, 'y3')])
- query('drop table "%s"' % table)
def testUpsertWithCompositeKey(self):
upsert = self.db.upsert
query = self.db.query
table = 'upsert_test_table_2'
query('drop table if exists "%s"' % table)
+ self.addCleanup(query, 'drop table "%s"' % table)
query('create table "%s" ('
"n integer, m integer, t text, primary key (n, m))" % table)
s = dict(n=1, m=2, t='x')
@@ -1210,13 +1212,13 @@
self.assertEqual(r['t'], 'nm')
r = query(q).getresult()
self.assertEqual(r, [(1, 2, 'x'), (1, 3, 'nm'), (2, 3, 'y')])
- query('drop table "%s"' % table)
def testUpsertWithQuotedNames(self):
upsert = self.db.upsert
query = self.db.query
table = 'test table for upsert()'
query('drop table if exists "%s"' % table)
+ self.addCleanup(query, 'drop table "%s"' % table)
query('create table "%s" ('
'"Prime!" smallint primary key,'
'"much space" integer, "Questions?" text)' % table)
@@ -1249,6 +1251,7 @@
f = False if pg.get_bool() else 'f'
table = 'clear_test_table'
query('drop table if exists "%s"' % table)
+ self.addCleanup(query, 'drop table "%s"' % table)
query('create table "%s" ('
"n integer, b boolean, d date, t text)" % table)
r = clear(table)
@@ -1262,13 +1265,13 @@
result = {'a': 1, 'n': 0, 'b': f, 'd': '', 't': '',
'oid': long(1)}
self.assertEqual(r, result)
- query('drop table "%s"' % table)
def testClearWithQuotedNames(self):
clear = self.db.clear
query = self.db.query
table = 'test table for clear()'
query('drop table if exists "%s"' % table)
+ self.addCleanup(query, 'drop table "%s"' % table)
query('create table "%s" ('
'"Prime!" smallint primary key,'
'"much space" integer, "Questions?" text)' % table)
@@ -1277,13 +1280,13 @@
self.assertEqual(r['Prime!'], 0)
self.assertEqual(r['much space'], 0)
self.assertEqual(r['Questions?'], '')
- query('drop table "%s"' % table)
def testDelete(self):
delete = self.db.delete
query = self.db.query
table = 'delete_test_table'
query('drop table if exists "%s"' % table)
+ self.addCleanup(query, 'drop table "%s"' % table)
query('create table "%s" ('
"n integer, t text) with oids" % table)
for n, t in enumerate('xyz'):
@@ -1309,12 +1312,12 @@
s = delete(table, r)
self.assertEqual(s, 0)
self.assertRaises(pg.DatabaseError, self.db.get, table, 2, 'n')
- query('drop table "%s"' % table)
def testDeleteWithCompositeKey(self):
query = self.db.query
table = 'delete_test_table_1'
query('drop table if exists "%s"' % table)
+ self.addCleanup(query, 'drop table "%s"' % table)
query('create table "%s" ('
"n integer, t text, primary key (n))" % table)
for n, t in enumerate('abc'):
@@ -1330,9 +1333,9 @@
r = query('select t from "%s" where n=3' % table
).getresult()[0][0]
self.assertEqual(r, 'c')
- query('drop table "%s"' % table)
table = 'delete_test_table_2'
query('drop table if exists "%s"' % table)
+ self.addCleanup(query, 'drop table "%s"' % table)
query('create table "%s" ('
"n integer, m integer, t text, primary key (n, m))" % table)
for n in range(3):
@@ -1354,13 +1357,13 @@
r = [r[0] for r in query('select t from "%s" where n=3'
' order by m' % table).getresult()]
self.assertEqual(r, ['f'])
- query('drop table "%s"' % table)
def testDeleteWithQuotedNames(self):
delete = self.db.delete
query = self.db.query
table = 'test table for delete()'
query('drop table if exists "%s"' % table)
+ self.addCleanup(query, 'drop table "%s"' % table)
query('create table "%s" ('
'"Prime!" smallint primary key,'
'"much space" integer, "Questions?" text)' % table)
@@ -1376,11 +1379,11 @@
self.assertEqual(r, 1)
r = query('select count(*) from "%s"' % table).getresult()
self.assertEqual(r[0][0], 0)
- query('drop table "%s"' % table)
def testTransaction(self):
query = self.db.query
query("drop table if exists test_table")
+ self.addCleanup(query, "drop table test_table")
query("create table test_table (n integer)")
self.db.begin()
query("insert into test_table values (1)")
@@ -1409,11 +1412,11 @@
r = [r[0] for r in query(
"select * from test_table order by 1").getresult()]
self.assertEqual(r, [1, 2, 5, 7, 9])
- query("drop table test_table")
def testContextManager(self):
query = self.db.query
query("drop table if exists test_table")
+ self.addCleanup(query, "drop table test_table")
query("create table test_table (n integer check(n>0))")
with self.db:
query("insert into test_table values (1)")
@@ -1438,11 +1441,11 @@
r = [r[0] for r in query(
"select * from test_table order by 1").getresult()]
self.assertEqual(r, [1, 2, 5, 7])
- query("drop table test_table")
def testBytea(self):
query = self.db.query
query('drop table if exists bytea_test')
+ self.addCleanup(query, 'drop table bytea_test')
query('create table bytea_test (n smallint primary key, data bytea)')
s = b"It's all \\ kinds \x00 of\r nasty \xff stuff!\n"
r = self.db.escape_bytea(s)
@@ -1457,11 +1460,11 @@
r = self.db.unescape_bytea(r)
self.assertIsInstance(r, bytes)
self.assertEqual(r, s)
- query('drop table bytea_test')
def testInsertUpdateGetBytea(self):
query = self.db.query
query('drop table if exists bytea_test')
+ self.addCleanup(query, 'drop table bytea_test')
query('create table bytea_test (n smallint primary key, data bytea)')
# insert null value
r = self.db.insert('bytea_test', n=0, data=None)
@@ -1519,7 +1522,6 @@
r = r['data']
self.assertIsInstance(r, bytes)
self.assertEqual(r, s)
- query('drop table bytea_test')
def testDebugWithCallable(self):
if debug:
@@ -1614,6 +1616,7 @@
self.db.query("set client_min_messages=warning")
def tearDown(self):
+ self.doCleanups()
self.db.close()
def testGetTables(self):
@@ -1636,6 +1639,7 @@
r = get_attnames("s4.t4")
self.assertEqual(r, result)
query("drop table if exists s3.t3m")
+ self.addCleanup(query, "drop table s3.t3m")
query("create table s3.t3m with oids as select 1 as m")
result_m = {'oid': 'int', 'm': 'int'}
r = get_attnames("s3.t3m")
@@ -1645,7 +1649,6 @@
self.assertEqual(r, result)
r = get_attnames("t3m")
self.assertEqual(r, result_m)
- query("drop table s3.t3m")
def testGet(self):
get = self.db.get
_______________________________________________
PyGreSQL mailing list
[email protected]
https://mail.vex.net/mailman/listinfo.cgi/pygresql