Author: cito
Date: Thu Jan 14 15:26:30 2016
New Revision: 744

Log:
Use cleanup feature in unittests

Modified:
   trunk/tests/test_classic_connection.py
   trunk/tests/test_classic_dbwrapper.py

Modified: trunk/tests/test_classic_connection.py
==============================================================================
--- trunk/tests/test_classic_connection.py      Thu Jan 14 12:32:01 2016        
(r743)
+++ trunk/tests/test_classic_connection.py      Thu Jan 14 15:26:30 2016        
(r744)
@@ -264,6 +264,7 @@
         self.c = connect()
 
     def tearDown(self):
+        self.doCleanups()
         self.c.close()
 
     def testClassName(self):
@@ -506,6 +507,7 @@
     def testQuery(self):
         query = self.c.query
         query("drop table if exists test_table")
+        self.addCleanup(query, "drop table test_table")
         q = "create table test_table (n integer) with oids"
         r = query(q)
         self.assertIsNone(r)
@@ -536,7 +538,6 @@
         r = query(q)
         self.assertIsInstance(r, str)
         self.assertEqual(r, '5')
-        query("drop table test_table")
 
 
 class TestUnicodeQueries(unittest.TestCase):
@@ -1204,6 +1205,7 @@
         self.c = connect()
 
     def tearDown(self):
+        self.doCleanups()
         self.c.close()
 
     def testGetNotify(self):
@@ -1250,31 +1252,29 @@
         self.assertIs(self.c.get_notice_receiver(), r)
 
     def testNoticeReceiver(self):
+        self.addCleanup(self.c.query, 'drop function bilbo_notice();')
         self.c.query('''create function bilbo_notice() returns void AS $$
             begin
                 raise warning 'Bilbo was here!';
             end;
             $$ language plpgsql''')
-        try:
-            received = {}
+        received = {}
 
-            def notice_receiver(notice):
-                for attr in dir(notice):
-                    if attr.startswith('__'):
-                        continue
-                    value = getattr(notice, attr)
-                    if isinstance(value, str):
-                        value = value.replace('WARNUNG', 'WARNING')
-                    received[attr] = value
-
-            self.c.set_notice_receiver(notice_receiver)
-            self.c.query('''select bilbo_notice()''')
-            self.assertEqual(received, dict(
-                pgcnx=self.c, message='WARNING:  Bilbo was here!\n',
-                severity='WARNING', primary='Bilbo was here!',
-                detail=None, hint=None))
-        finally:
-            self.c.query('''drop function bilbo_notice();''')
+        def notice_receiver(notice):
+            for attr in dir(notice):
+                if attr.startswith('__'):
+                    continue
+                value = getattr(notice, attr)
+                if isinstance(value, str):
+                    value = value.replace('WARNUNG', 'WARNING')
+                received[attr] = value
+
+        self.c.set_notice_receiver(notice_receiver)
+        self.c.query('select bilbo_notice()')
+        self.assertEqual(received, dict(
+            pgcnx=self.c, message='WARNING:  Bilbo was here!\n',
+            severity='WARNING', primary='Bilbo was here!',
+            detail=None, hint=None))
 
 
 class TestConfigFunctions(unittest.TestCase):

Modified: trunk/tests/test_classic_dbwrapper.py
==============================================================================
--- trunk/tests/test_classic_dbwrapper.py       Thu Jan 14 12:32:01 2016        
(r743)
+++ trunk/tests/test_classic_dbwrapper.py       Thu Jan 14 15:26:30 2016        
(r744)
@@ -309,6 +309,7 @@
         query('set bytea_output=hex')
 
     def tearDown(self):
+        self.doCleanups()
         self.db.close()
 
     def testClassName(self):
@@ -415,6 +416,7 @@
         get_attnames = self.db.get_attnames
         query = self.db.query
         query("drop table if exists test_table")
+        self.addCleanup(query, "drop table test_table")
         query("create table test_table("
             " n int, alpha smallint, beta bool,"
             " gamma char(5), tau text, v varchar(3))")
@@ -423,13 +425,13 @@
         self.assertEquals(r, dict(
             n='int', alpha='int', beta='bool',
             gamma='text', tau='text', v='text'))
-        query("drop table test_table")
 
     def testGetAttnamesWithQuotes(self):
         get_attnames = self.db.get_attnames
         query = self.db.query
         table = 'test table for get_attnames()'
         query('drop table if exists "%s"' % table)
+        self.addCleanup(query, 'drop table "%s"' % table)
         query('create table "%s"('
             '"Prime!" smallint,'
             '"much space" integer, "Questions?" text)' % table)
@@ -437,12 +439,12 @@
         self.assertIsInstance(r, dict)
         self.assertEquals(r, {
             'Prime!': 'int', 'much space': 'int', 'Questions?': 'text'})
-        query('drop table "%s"' % table)
 
     def testGetAttnamesWithRegtypes(self):
         get_attnames = self.db.get_attnames
         query = self.db.query
         query("drop table if exists test_table")
+        self.addCleanup(query, "drop table test_table")
         query("create table test_table("
             " n int, alpha smallint, beta bool,"
             " gamma char(5), tau text, v varchar(3))")
@@ -455,12 +457,12 @@
         self.assertEquals(r, dict(
             n='integer', alpha='smallint', beta='boolean',
             gamma='character', tau='text', v='character varying'))
-        query("drop table test_table")
 
     def testGetAttnamesIsCached(self):
         get_attnames = self.db.get_attnames
         query = self.db.query
         query("drop table if exists test_table")
+        self.addCleanup(query, "drop table if exists test_table")
         query("create table test_table(col int)")
         r = get_attnames("test_table")
         self.assertIsInstance(r, dict)
@@ -481,6 +483,7 @@
         get_attnames = self.db.get_attnames
         query = self.db.query
         query("drop table if exists test_table")
+        self.addCleanup(query, "drop table test_table")
         query("create table test_table("
             " n int, alpha smallint, v varchar(3),"
             " gamma char(5), tau text, beta bool)")
@@ -489,7 +492,6 @@
         self.assertEquals(r, OrderedDict([
             ('n', 'int'), ('alpha', 'int'), ('v', 'text'),
             ('gamma', 'text'), ('tau', 'text'), ('beta', 'bool')]))
-        query("drop table test_table")
         if OrderedDict is dict:
             self.skipTest('OrderedDict is not supported')
         r = ' '.join(list(r.keys()))
@@ -498,6 +500,7 @@
     def testQuery(self):
         query = self.db.query
         query("drop table if exists test_table")
+        self.addCleanup(query, "drop table test_table")
         q = "create table test_table (n integer) with oids"
         r = query(q)
         self.assertIsNone(r)
@@ -527,7 +530,6 @@
         r = query(q)
         self.assertIsInstance(r, str)
         self.assertEqual(r, '5')
-        query("drop table test_table")
 
     def testMultipleQueries(self):
         self.assertEqual(self.db.query(
@@ -538,6 +540,7 @@
     def testQueryWithParams(self):
         query = self.db.query
         query("drop table if exists test_table")
+        self.addCleanup(query, "drop table test_table")
         q = "create table test_table (n1 integer, n2 integer) with oids"
         query(q)
         q = "insert into test_table values ($1, $2)"
@@ -561,7 +564,6 @@
         q = "delete from test_table where n2!=$1"
         r = query(q, 4)
         self.assertEqual(r, '3')
-        query("drop table test_table")
 
     def testEmptyQuery(self):
         self.assertRaises(ValueError, self.db.query, '')
@@ -578,6 +580,7 @@
         for t in ('pkeytest', 'primary key test'):
             for n in range(7):
                 query('drop table if exists "%s%d"' % (t, n))
+                self.addCleanup(query, 'drop table "%s%d"' % (t, n))
             query('create table "%s0" ('
                 "a smallint)" % t)
             query('create table "%s1" ('
@@ -619,8 +622,6 @@
             self.assertEqual(pkey('%s1' % t), 'b')
             # we get the changed primary key when the cache is flushed
             self.assertEqual(pkey('%s1' % t, flush=True), 'x')
-            for n in range(7):
-                query('drop table "%s%d"' % (t, n))
 
     def testGetDatabases(self):
         databases = self.db.get_databases()
@@ -692,6 +693,7 @@
             self.db.get_attnames, 'has.too.many.dots')
         for table in ('attnames_test_table', 'test table for attnames'):
             self.db.query('drop table if exists "%s"' % table)
+            self.addCleanup(self.db.query, 'drop table "%s"' % table)
             self.db.query('create table "%s" ('
                 'a smallint, b integer, c bigint, '
                 'e numeric, f float, f2 double precision, m money, '
@@ -706,7 +708,6 @@
                 'u': 'text', 't': 'text', 'v': 'text',
                 'y': 'int', 'x': 'int', 'z': 'int', 'oid': 'int'}
             self.assertEqual(attributes, result)
-            self.db.query('drop table "%s"' % table)
 
     def testHasTablePrivilege(self):
         can = self.db.has_table_privilege
@@ -727,6 +728,7 @@
         query = self.db.query
         table = 'get_test_table'
         query('drop table if exists "%s"' % table)
+        self.addCleanup(query, 'drop table "%s"' % table)
         query('create table "%s" ('
             "n integer, t text) with oids" % table)
         for n, t in enumerate('xyz'):
@@ -760,22 +762,22 @@
         self.assertEqual(get(table, r)['t'], 'z')
         r['n'] = 2
         self.assertEqual(get(table, r)['t'], 'y')
-        query('drop table "%s"' % table)
 
     def testGetWithCompositeKey(self):
         get = self.db.get
         query = self.db.query
         table = 'get_test_table_1'
         query('drop table if exists "%s"' % table)
+        self.addCleanup(query, 'drop table "%s"' % table)
         query('create table "%s" ('
             "n integer, t text, primary key (n))" % table)
         for n, t in enumerate('abc'):
             query('insert into "%s" values('
                 "%d, '%s')" % (table, n + 1, t))
         self.assertEqual(get(table, 2)['t'], 'b')
-        query('drop table "%s"' % table)
         table = 'get_test_table_2'
         query('drop table if exists "%s"' % table)
+        self.addCleanup(query, 'drop table "%s"' % table)
         query('create table "%s" ('
             "n integer, m integer, t text, primary key (n, m))" % table)
         for n in range(3):
@@ -789,13 +791,13 @@
         self.assertEqual(r['t'], 'b')
         r = get(table, dict(n=3, m=2), frozenset(['n', 'm']))
         self.assertEqual(r['t'], 'f')
-        query('drop table "%s"' % table)
 
     def testGetWithQuotedNames(self):
         get = self.db.get
         query = self.db.query
         table = 'test table for get()'
         query('drop table if exists "%s"' % table)
+        self.addCleanup(query, 'drop table "%s"' % table)
         query('create table "%s" ('
             '"Prime!" smallint primary key,'
             '"much space" integer, "Questions?" text)' % table)
@@ -806,7 +808,6 @@
         self.assertEqual(r['Prime!'], 17)
         self.assertEqual(r['much space'], 1001)
         self.assertEqual(r['Questions?'], 'No!')
-        query('drop table "%s"' % table)
 
     def testGetFromView(self):
         self.db.query('delete from test where i4=14')
@@ -820,6 +821,7 @@
         get = self.db.get
         query = self.db.query
         query("drop table if exists test_students")
+        self.addCleanup(query, "drop table test_students")
         query("create table test_students (firstname varchar primary key,"
             " nickname varchar, grade char(2))")
         query("insert into test_students values ("
@@ -853,7 +855,6 @@
         r = query(q).getresult()
         self.assertEqual(len(r), 3)
         self.assertEqual(r[1][2], 'D-')
-        query('drop table test_students')
 
     def testInsert(self):
         insert = self.db.insert
@@ -862,6 +863,7 @@
         decimal = pg.get_decimal()
         table = 'insert_test_table'
         query('drop table if exists "%s"' % table)
+        self.addCleanup(query, 'drop table "%s"' % table)
         query('create table "%s" ('
             "i2 smallint, i4 integer, i8 bigint,"
             " d numeric, f4 real, f8 double precision, m money,"
@@ -957,13 +959,13 @@
                 if item[0] in expect)
             self.assertEqual(data, expect)
             query('delete from "%s"' % table)
-        query('drop table "%s"' % table)
 
     def testInsertWithQuotedNames(self):
         insert = self.db.insert
         query = self.db.query
         table = 'test table for insert()'
         query('drop table if exists "%s"' % table)
+        self.addCleanup(query, 'drop table "%s"' % table)
         query('create table "%s" ('
             '"Prime!" smallint primary key,'
             '"much space" integer, "Questions?" text)' % table)
@@ -979,13 +981,13 @@
         self.assertEqual(r['Prime!'], 11)
         self.assertEqual(r['much space'], 2002)
         self.assertEqual(r['Questions?'], 'What?')
-        query('drop table "%s"' % table)
 
     def testUpdate(self):
         update = self.db.update
         query = self.db.query
         table = 'update_test_table'
         query('drop table if exists "%s"' % table)
+        self.addCleanup(query, 'drop table "%s"' % table)
         query('create table "%s" ('
             "n integer, t text) with oids" % table)
         for n, t in enumerate('xyz'):
@@ -999,13 +1001,13 @@
         q = 'select t from "%s" where n=2' % table
         r = query(q).getresult()[0][0]
         self.assertEqual(r, 'u')
-        query('drop table "%s"' % table)
 
     def testUpdateWithCompositeKey(self):
         update = self.db.update
         query = self.db.query
         table = 'update_test_table_1'
         query('drop table if exists "%s"' % table)
+        self.addCleanup(query, 'drop table if exists "%s"' % table)
         query('create table "%s" ('
             "n integer, t text, primary key (n))" % table)
         for n, t in enumerate('abc'):
@@ -1048,13 +1050,13 @@
         q = 'select t from "%s" where n=2 order by m' % table
         r = [r[0] for r in query(q).getresult()]
         self.assertEqual(r, ['c', 'x'])
-        query('drop table "%s"' % table)
 
     def testUpdateWithQuotedNames(self):
         update = self.db.update
         query = self.db.query
         table = 'test table for update()'
         query('drop table if exists "%s"' % table)
+        self.addCleanup(query, 'drop table "%s"' % table)
         query('create table "%s" ('
             '"Prime!" smallint primary key,'
             '"much space" integer, "Questions?" text)' % table)
@@ -1072,13 +1074,13 @@
         self.assertEqual(r['Prime!'], 13)
         self.assertEqual(r['much space'], 7007)
         self.assertEqual(r['Questions?'], 'When?')
-        query('drop table "%s"' % table)
 
     def testUpsert(self):
         upsert = self.db.upsert
         query = self.db.query
         table = 'upsert_test_table'
         query('drop table if exists "%s"' % table)
+        self.addCleanup(query, 'drop table "%s"' % table)
         query('create table "%s" ('
             "n integer primary key, t text) with oids" % table)
         s = dict(n=1, t='x')
@@ -1141,13 +1143,13 @@
         self.assertEqual(r['t'], 'x2')
         r = query(q).getresult()
         self.assertEqual(r, [(1, 'x2'), (2, 'y3')])
-        query('drop table "%s"' % table)
 
     def testUpsertWithCompositeKey(self):
         upsert = self.db.upsert
         query = self.db.query
         table = 'upsert_test_table_2'
         query('drop table if exists "%s"' % table)
+        self.addCleanup(query, 'drop table "%s"' % table)
         query('create table "%s" ('
             "n integer, m integer, t text, primary key (n, m))" % table)
         s = dict(n=1, m=2, t='x')
@@ -1210,13 +1212,13 @@
         self.assertEqual(r['t'], 'nm')
         r = query(q).getresult()
         self.assertEqual(r, [(1, 2, 'x'), (1, 3, 'nm'), (2, 3, 'y')])
-        query('drop table "%s"' % table)
 
     def testUpsertWithQuotedNames(self):
         upsert = self.db.upsert
         query = self.db.query
         table = 'test table for upsert()'
         query('drop table if exists "%s"' % table)
+        self.addCleanup(query, 'drop table "%s"' % table)
         query('create table "%s" ('
             '"Prime!" smallint primary key,'
             '"much space" integer, "Questions?" text)' % table)
@@ -1249,6 +1251,7 @@
         f = False if pg.get_bool() else 'f'
         table = 'clear_test_table'
         query('drop table if exists "%s"' % table)
+        self.addCleanup(query, 'drop table "%s"' % table)
         query('create table "%s" ('
             "n integer, b boolean, d date, t text)" % table)
         r = clear(table)
@@ -1262,13 +1265,13 @@
         result = {'a': 1, 'n': 0, 'b': f, 'd': '', 't': '',
             'oid': long(1)}
         self.assertEqual(r, result)
-        query('drop table "%s"' % table)
 
     def testClearWithQuotedNames(self):
         clear = self.db.clear
         query = self.db.query
         table = 'test table for clear()'
         query('drop table if exists "%s"' % table)
+        self.addCleanup(query, 'drop table "%s"' % table)
         query('create table "%s" ('
             '"Prime!" smallint primary key,'
             '"much space" integer, "Questions?" text)' % table)
@@ -1277,13 +1280,13 @@
         self.assertEqual(r['Prime!'], 0)
         self.assertEqual(r['much space'], 0)
         self.assertEqual(r['Questions?'], '')
-        query('drop table "%s"' % table)
 
     def testDelete(self):
         delete = self.db.delete
         query = self.db.query
         table = 'delete_test_table'
         query('drop table if exists "%s"' % table)
+        self.addCleanup(query, 'drop table "%s"' % table)
         query('create table "%s" ('
             "n integer, t text) with oids" % table)
         for n, t in enumerate('xyz'):
@@ -1309,12 +1312,12 @@
         s = delete(table, r)
         self.assertEqual(s, 0)
         self.assertRaises(pg.DatabaseError, self.db.get, table, 2, 'n')
-        query('drop table "%s"' % table)
 
     def testDeleteWithCompositeKey(self):
         query = self.db.query
         table = 'delete_test_table_1'
         query('drop table if exists "%s"' % table)
+        self.addCleanup(query, 'drop table "%s"' % table)
         query('create table "%s" ('
             "n integer, t text, primary key (n))" % table)
         for n, t in enumerate('abc'):
@@ -1330,9 +1333,9 @@
         r = query('select t from "%s" where n=3' % table
                   ).getresult()[0][0]
         self.assertEqual(r, 'c')
-        query('drop table "%s"' % table)
         table = 'delete_test_table_2'
         query('drop table if exists "%s"' % table)
+        self.addCleanup(query, 'drop table "%s"' % table)
         query('create table "%s" ('
             "n integer, m integer, t text, primary key (n, m))" % table)
         for n in range(3):
@@ -1354,13 +1357,13 @@
         r = [r[0] for r in query('select t from "%s" where n=3'
             ' order by m' % table).getresult()]
         self.assertEqual(r, ['f'])
-        query('drop table "%s"' % table)
 
     def testDeleteWithQuotedNames(self):
         delete = self.db.delete
         query = self.db.query
         table = 'test table for delete()'
         query('drop table if exists "%s"' % table)
+        self.addCleanup(query, 'drop table "%s"' % table)
         query('create table "%s" ('
             '"Prime!" smallint primary key,'
             '"much space" integer, "Questions?" text)' % table)
@@ -1376,11 +1379,11 @@
         self.assertEqual(r, 1)
         r = query('select count(*) from "%s"' % table).getresult()
         self.assertEqual(r[0][0], 0)
-        query('drop table "%s"' % table)
 
     def testTransaction(self):
         query = self.db.query
         query("drop table if exists test_table")
+        self.addCleanup(query, "drop table test_table")
         query("create table test_table (n integer)")
         self.db.begin()
         query("insert into test_table values (1)")
@@ -1409,11 +1412,11 @@
         r = [r[0] for r in query(
             "select * from test_table order by 1").getresult()]
         self.assertEqual(r, [1, 2, 5, 7, 9])
-        query("drop table test_table")
 
     def testContextManager(self):
         query = self.db.query
         query("drop table if exists test_table")
+        self.addCleanup(query, "drop table test_table")
         query("create table test_table (n integer check(n>0))")
         with self.db:
             query("insert into test_table values (1)")
@@ -1438,11 +1441,11 @@
         r = [r[0] for r in query(
             "select * from test_table order by 1").getresult()]
         self.assertEqual(r, [1, 2, 5, 7])
-        query("drop table test_table")
 
     def testBytea(self):
         query = self.db.query
         query('drop table if exists bytea_test')
+        self.addCleanup(query, 'drop table bytea_test')
         query('create table bytea_test (n smallint primary key, data bytea)')
         s = b"It's all \\ kinds \x00 of\r nasty \xff stuff!\n"
         r = self.db.escape_bytea(s)
@@ -1457,11 +1460,11 @@
         r = self.db.unescape_bytea(r)
         self.assertIsInstance(r, bytes)
         self.assertEqual(r, s)
-        query('drop table bytea_test')
 
     def testInsertUpdateGetBytea(self):
         query = self.db.query
         query('drop table if exists bytea_test')
+        self.addCleanup(query, 'drop table bytea_test')
         query('create table bytea_test (n smallint primary key, data bytea)')
         # insert null value
         r = self.db.insert('bytea_test', n=0, data=None)
@@ -1519,7 +1522,6 @@
         r = r['data']
         self.assertIsInstance(r, bytes)
         self.assertEqual(r, s)
-        query('drop table bytea_test')
 
     def testDebugWithCallable(self):
         if debug:
@@ -1614,6 +1616,7 @@
         self.db.query("set client_min_messages=warning")
 
     def tearDown(self):
+        self.doCleanups()
         self.db.close()
 
     def testGetTables(self):
@@ -1636,6 +1639,7 @@
         r = get_attnames("s4.t4")
         self.assertEqual(r, result)
         query("drop table if exists s3.t3m")
+        self.addCleanup(query, "drop table s3.t3m")
         query("create table s3.t3m with oids as select 1 as m")
         result_m = {'oid': 'int', 'm': 'int'}
         r = get_attnames("s3.t3m")
@@ -1645,7 +1649,6 @@
         self.assertEqual(r, result)
         r = get_attnames("t3m")
         self.assertEqual(r, result_m)
-        query("drop table s3.t3m")
 
     def testGet(self):
         get = self.db.get
_______________________________________________
PyGreSQL mailing list
[email protected]
https://mail.vex.net/mailman/listinfo.cgi/pygresql

Reply via email to