Author: cito
Date: Sun Jan 6 11:22:53 2013
New Revision: 501
Log:
Speed up the pgnotify tests, avoid globals.
Modified:
trunk/module/TEST_PyGreSQL_classic.py
Modified: trunk/module/TEST_PyGreSQL_classic.py
==============================================================================
--- trunk/module/TEST_PyGreSQL_classic.py Sun Jan 6 11:05:32 2013
(r500)
+++ trunk/module/TEST_PyGreSQL_classic.py Sun Jan 6 11:22:53 2013
(r501)
@@ -2,7 +2,8 @@
from __future__ import with_statement
-import sys, thread, time
+import sys, thread
+from time import sleep
import unittest
from pg import *
@@ -25,12 +26,6 @@
db.query("SET STANDARD_CONFORMING_STRINGS=FALSE")
return db
-def cb1(arg_dict):
- global cb1_return
- if arg_dict is None:
- cb1_return = 'timed out'
- else:
- cb1_return = arg_dict
class UtilityTest(unittest.TestCase):
@@ -234,58 +229,93 @@
# note that notify can be created as part of the DB class or
# independently.
- def test_notify_DB(self):
- global cb1_return
+ def notify_callback(self, arg_dict):
+ if arg_dict:
+ arg_dict['called'] = True
+ else:
+ self.notify_timeout = True
+ def test_notify_DB(self):
db = opendb()
db2 = opendb()
- # Listen for 'event_1'
- pgn = db2.pgnotify('event_1', cb1)
+ arg_dict = {}
+ self.notify_timeout = False
+ # Listen for 'event_1'.
+ pgn = db.pgnotify('event_1', self.notify_callback, arg_dict)
thread.start_new_thread(pgn, ())
- time.sleep(1)
+ # Wait until the thread has started.
+ for n in range(1000):
+ if 'event' in arg_dict:
+ break
+ sleep(0.01)
+ self.assertTrue('event' in arg_dict)
# Generate notification from the other connection.
- db.query('notify event_1')
- time.sleep(1)
+ db2.query("notify event_1, 'payload_1'")
+ # Wait until the thread has ended.
+ for n in range(1000):
+ if arg_dict['event'] == 'event_1':
+ break
+ sleep(0.01)
+ self.assertEqual(arg_dict['event'], 'event_1')
+ self.assertEqual(arg_dict['extra'], 'payload_1')
+ self.assertTrue(isinstance(arg_dict['pid'], int))
# Check that callback has been invoked.
- self.assertEquals(cb1_return['event'], 'event_1')
+ self.assertTrue(arg_dict.get('called'))
+ self.assertFalse(self.notify_timeout)
def test_notify_timeout_DB(self):
db = opendb()
- db2 = opendb()
- global cb1_return
- # Listen for 'event_1'
- pgn = db2.pgnotify('event_1', cb1, {}, 1)
+ arg_dict = {}
+ self.notify_timeout = False
+ # Listen for 'event_1'.
+ pgn = db.pgnotify('event_1', self.notify_callback, arg_dict, 0.01)
thread.start_new_thread(pgn, ())
# Sleep long enough to time out.
- time.sleep(2)
+ sleep(0.02)
# Verify that we've indeed timed out.
- self.assertEquals(cb1_return, 'timed out')
+ self.assertFalse(arg_dict.get('called'))
+ self.assertTrue(self.notify_timeout)
def test_notify(self):
db = opendb()
db2 = opendb()
- global cb1_return
+ arg_dict = {}
+ self.notify_timeout = False
# Listen for 'event_1'
- pgn = pgnotify(db2, 'event_1', cb1)
+ pgn = pgnotify(db, 'event_1', self.notify_callback, arg_dict)
thread.start_new_thread(pgn, ())
- time.sleep(1)
+ # Wait until the thread has started.
+ for n in range(1000):
+ if 'event' in arg_dict:
+ break
+ sleep(0.01)
+ self.assertTrue('event' in arg_dict)
# Generate notification from the other connection.
- db.query('notify event_1')
- time.sleep(1)
+ db2.query("notify event_1, 'payload_1'")
+ # Wait until the thread has ended.
+ for n in range(1000):
+ if arg_dict['event'] == 'event_1':
+ break
+ sleep(0.01)
+ self.assertEqual(arg_dict['event'], 'event_1')
+ self.assertEqual(arg_dict['extra'], 'payload_1')
+ self.assertTrue(isinstance(arg_dict['pid'], int))
# Check that callback has been invoked.
- self.assertEquals(cb1_return['event'], 'event_1')
+ self.assertTrue(arg_dict.get('called'))
+ self.assertFalse(self.notify_timeout)
def test_notify_timeout(self):
db = opendb()
- db2 = opendb()
- global cb1_return
- # Listen for 'event_1'
- pgn = pgnotify(db2, 'event_1', cb1, {}, 1)
+ arg_dict = {}
+ self.notify_timeout = False
+ # Listen for 'event_1'.
+ pgn = pgnotify(db, 'event_1', self.notify_callback, arg_dict, 0.01)
thread.start_new_thread(pgn, ())
# Sleep long enough to time out.
- time.sleep(2)
+ sleep(0.02)
# Verify that we've indeed timed out.
- self.assertEquals(cb1_return, 'timed out')
+ self.assertFalse(arg_dict.get('called'))
+ self.assertTrue(self.notify_timeout)
if __name__ == '__main__':
suite = unittest.TestSuite()
_______________________________________________
PyGreSQL mailing list
[email protected]
https://mail.vex.net/mailman/listinfo.cgi/pygresql