Author: cito
Date: Sun Jan  6 11:22:53 2013
New Revision: 501

Log:
Speed up the pgnotify tests, avoid globals.

Modified:
   trunk/module/TEST_PyGreSQL_classic.py

Modified: trunk/module/TEST_PyGreSQL_classic.py
==============================================================================
--- trunk/module/TEST_PyGreSQL_classic.py       Sun Jan  6 11:05:32 2013        
(r500)
+++ trunk/module/TEST_PyGreSQL_classic.py       Sun Jan  6 11:22:53 2013        
(r501)
@@ -2,7 +2,8 @@
 
 from __future__ import with_statement
 
-import sys, thread, time
+import sys, thread
+from time import sleep
 import unittest
 from pg import *
 
@@ -25,12 +26,6 @@
     db.query("SET STANDARD_CONFORMING_STRINGS=FALSE")
     return db
 
-def cb1(arg_dict):
-    global cb1_return
-    if arg_dict is None:
-        cb1_return = 'timed out'
-    else:
-        cb1_return = arg_dict
 
 class UtilityTest(unittest.TestCase):
 
@@ -234,58 +229,93 @@
     # note that notify can be created as part of the DB class or
     # independently.
 
-    def test_notify_DB(self):
-        global cb1_return
+    def notify_callback(self, arg_dict):
+        if arg_dict:
+            arg_dict['called'] = True
+        else:
+            self.notify_timeout = True
 
+    def test_notify_DB(self):
         db = opendb()
         db2 = opendb()
-        # Listen for 'event_1'
-        pgn = db2.pgnotify('event_1', cb1)
+        arg_dict = {}
+        self.notify_timeout = False
+        # Listen for 'event_1'.
+        pgn = db.pgnotify('event_1', self.notify_callback, arg_dict)
         thread.start_new_thread(pgn, ())
-        time.sleep(1)
+        # Wait until the thread has started.
+        for n in range(1000):
+            if 'event' in arg_dict:
+                break
+            sleep(0.01)
+        self.assertTrue('event' in arg_dict)
         # Generate notification from the other connection.
-        db.query('notify event_1')
-        time.sleep(1)
+        db2.query("notify event_1, 'payload_1'")
+        # Wait until the thread has ended.
+        for n in range(1000):
+            if arg_dict['event'] == 'event_1':
+                break
+            sleep(0.01)
+        self.assertEqual(arg_dict['event'], 'event_1')
+        self.assertEqual(arg_dict['extra'], 'payload_1')
+        self.assertTrue(isinstance(arg_dict['pid'], int))
         # Check that callback has been invoked.
-        self.assertEquals(cb1_return['event'], 'event_1')
+        self.assertTrue(arg_dict.get('called'))
+        self.assertFalse(self.notify_timeout)
 
     def test_notify_timeout_DB(self):
         db = opendb()
-        db2 = opendb()
-        global cb1_return
-        # Listen for 'event_1'
-        pgn = db2.pgnotify('event_1', cb1, {}, 1)
+        arg_dict = {}
+        self.notify_timeout = False
+        # Listen for 'event_1'.
+        pgn = db.pgnotify('event_1', self.notify_callback, arg_dict, 0.01)
         thread.start_new_thread(pgn, ())
         # Sleep long enough to time out.
-        time.sleep(2)
+        sleep(0.02)
         # Verify that we've indeed timed out.
-        self.assertEquals(cb1_return, 'timed out')
+        self.assertFalse(arg_dict.get('called'))
+        self.assertTrue(self.notify_timeout)
 
     def test_notify(self):
         db = opendb()
         db2 = opendb()
-        global cb1_return
+        arg_dict = {}
+        self.notify_timeout = False
         # Listen for 'event_1'
-        pgn = pgnotify(db2, 'event_1', cb1)
+        pgn = pgnotify(db, 'event_1', self.notify_callback, arg_dict)
         thread.start_new_thread(pgn, ())
-        time.sleep(1)
+        # Wait until the thread has started.
+        for n in range(1000):
+            if 'event' in arg_dict:
+                break
+            sleep(0.01)
+        self.assertTrue('event' in arg_dict)
         # Generate notification from the other connection.
-        db.query('notify event_1')
-        time.sleep(1)
+        db2.query("notify event_1, 'payload_1'")
+        # Wait until the thread has ended.
+        for n in range(1000):
+            if arg_dict['event'] == 'event_1':
+                break
+            sleep(0.01)
+        self.assertEqual(arg_dict['event'], 'event_1')
+        self.assertEqual(arg_dict['extra'], 'payload_1')
+        self.assertTrue(isinstance(arg_dict['pid'], int))
         # Check that callback has been invoked.
-        self.assertEquals(cb1_return['event'], 'event_1')
+        self.assertTrue(arg_dict.get('called'))
+        self.assertFalse(self.notify_timeout)
 
     def test_notify_timeout(self):
         db = opendb()
-        db2 = opendb()
-        global cb1_return
-        # Listen for 'event_1'
-        pgn = pgnotify(db2, 'event_1', cb1, {}, 1)
+        arg_dict = {}
+        self.notify_timeout = False
+        # Listen for 'event_1'.
+        pgn = pgnotify(db, 'event_1', self.notify_callback, arg_dict, 0.01)
         thread.start_new_thread(pgn, ())
         # Sleep long enough to time out.
-        time.sleep(2)
+        sleep(0.02)
         # Verify that we've indeed timed out.
-        self.assertEquals(cb1_return, 'timed out')
+        self.assertFalse(arg_dict.get('called'))
+        self.assertTrue(self.notify_timeout)
 
 if __name__ == '__main__':
     suite = unittest.TestSuite()
_______________________________________________
PyGreSQL mailing list
[email protected]
https://mail.vex.net/mailman/listinfo.cgi/pygresql

Reply via email to