Author: cito
Date: Sun Jan  6 11:47:08 2013
New Revision: 502

Log:
Test that pgnotify() properly finishes.

Modified:
   trunk/module/TEST_PyGreSQL_classic.py

Modified: trunk/module/TEST_PyGreSQL_classic.py
==============================================================================
--- trunk/module/TEST_PyGreSQL_classic.py       Sun Jan  6 11:22:53 2013        
(r501)
+++ trunk/module/TEST_PyGreSQL_classic.py       Sun Jan  6 11:47:08 2013        
(r502)
@@ -2,8 +2,9 @@
 
 from __future__ import with_statement
 
-import sys, thread
+import sys
 from time import sleep
+from threading import Thread
 import unittest
 from pg import *
 
@@ -240,19 +241,21 @@
         db2 = opendb()
         arg_dict = {}
         self.notify_timeout = False
-        # Listen for 'event_1'.
+        # Listen for 'event_1'
         pgn = db.pgnotify('event_1', self.notify_callback, arg_dict)
-        thread.start_new_thread(pgn, ())
+        thread = Thread(None, pgn)
+        thread.start()
         # Wait until the thread has started.
-        for n in range(1000):
+        for n in xrange(500):
             if 'event' in arg_dict:
                 break
             sleep(0.01)
         self.assertTrue('event' in arg_dict)
+        self.assertTrue(thread.isAlive())
         # Generate notification from the other connection.
         db2.query("notify event_1, 'payload_1'")
-        # Wait until the thread has ended.
-        for n in range(1000):
+        # Wait until the notification has been caught.
+        for n in xrange(500):
             if arg_dict['event'] == 'event_1':
                 break
             sleep(0.01)
@@ -262,6 +265,23 @@
         # Check that callback has been invoked.
         self.assertTrue(arg_dict.get('called'))
         self.assertFalse(self.notify_timeout)
+        arg_dict['called'] = False
+        self.assertTrue(thread.isAlive())
+        # Generate stop notification
+        db2.query("notify stop_event_1, 'payload_1'")
+        # Wait until the notification has been caught.
+        for n in xrange(500):
+            if arg_dict['event'] == 'stop_event_1':
+                break
+            sleep(0.01)
+        self.assertEqual(arg_dict['event'], 'stop_event_1')
+        self.assertEqual(arg_dict['extra'], 'payload_1')
+        self.assertTrue(isinstance(arg_dict['pid'], int))
+        # Check that callback has been invoked.
+        self.assertTrue(arg_dict.get('called'))
+        self.assertFalse(self.notify_timeout)
+        thread.join(5)
+        self.assertFalse(thread.isAlive())
 
     def test_notify_timeout_DB(self):
         db = opendb()
@@ -269,12 +289,14 @@
         self.notify_timeout = False
         # Listen for 'event_1'.
         pgn = db.pgnotify('event_1', self.notify_callback, arg_dict, 0.01)
-        thread.start_new_thread(pgn, ())
+        thread = Thread(None, pgn)
+        thread.start()
         # Sleep long enough to time out.
         sleep(0.02)
         # Verify that we've indeed timed out.
         self.assertFalse(arg_dict.get('called'))
         self.assertTrue(self.notify_timeout)
+        self.assertFalse(thread.isAlive())
 
     def test_notify(self):
         db = opendb()
@@ -283,17 +305,19 @@
         self.notify_timeout = False
         # Listen for 'event_1'
         pgn = pgnotify(db, 'event_1', self.notify_callback, arg_dict)
-        thread.start_new_thread(pgn, ())
+        thread = Thread(None, pgn)
+        thread.start()
         # Wait until the thread has started.
-        for n in range(1000):
+        for n in xrange(500):
             if 'event' in arg_dict:
                 break
             sleep(0.01)
         self.assertTrue('event' in arg_dict)
+        self.assertTrue(thread.isAlive())
         # Generate notification from the other connection.
         db2.query("notify event_1, 'payload_1'")
-        # Wait until the thread has ended.
-        for n in range(1000):
+        # Wait until the notification has been caught.
+        for n in xrange(500):
             if arg_dict['event'] == 'event_1':
                 break
             sleep(0.01)
@@ -303,6 +327,23 @@
         # Check that callback has been invoked.
         self.assertTrue(arg_dict.get('called'))
         self.assertFalse(self.notify_timeout)
+        arg_dict['called'] = False
+        self.assertTrue(thread.isAlive())
+        # Generate stop notification
+        db2.query("notify stop_event_1, 'payload_1'")
+        # Wait until the notification has been caught.
+        for n in xrange(500):
+            if arg_dict['event'] == 'stop_event_1':
+                break
+            sleep(0.01)
+        self.assertEqual(arg_dict['event'], 'stop_event_1')
+        self.assertEqual(arg_dict['extra'], 'payload_1')
+        self.assertTrue(isinstance(arg_dict['pid'], int))
+        # Check that callback has been invoked.
+        self.assertTrue(arg_dict.get('called'))
+        self.assertFalse(self.notify_timeout)
+        thread.join(5)
+        self.assertFalse(thread.isAlive())
 
     def test_notify_timeout(self):
         db = opendb()
@@ -310,12 +351,15 @@
         self.notify_timeout = False
         # Listen for 'event_1'.
         pgn = pgnotify(db, 'event_1', self.notify_callback, arg_dict, 0.01)
-        thread.start_new_thread(pgn, ())
+        thread = Thread(None, pgn)
+        thread.start()
         # Sleep long enough to time out.
         sleep(0.02)
         # Verify that we've indeed timed out.
         self.assertFalse(arg_dict.get('called'))
         self.assertTrue(self.notify_timeout)
+        self.assertFalse(thread.isAlive())
+
 
 if __name__ == '__main__':
     suite = unittest.TestSuite()
_______________________________________________
PyGreSQL mailing list
[email protected]
https://mail.vex.net/mailman/listinfo.cgi/pygresql

Reply via email to