Author: cito
Date: Sun Jan 6 11:47:08 2013
New Revision: 502
Log:
Test that pgnotify() properly finishes.
Modified:
trunk/module/TEST_PyGreSQL_classic.py
Modified: trunk/module/TEST_PyGreSQL_classic.py
==============================================================================
--- trunk/module/TEST_PyGreSQL_classic.py Sun Jan 6 11:22:53 2013
(r501)
+++ trunk/module/TEST_PyGreSQL_classic.py Sun Jan 6 11:47:08 2013
(r502)
@@ -2,8 +2,9 @@
from __future__ import with_statement
-import sys, thread
+import sys
from time import sleep
+from threading import Thread
import unittest
from pg import *
@@ -240,19 +241,21 @@
db2 = opendb()
arg_dict = {}
self.notify_timeout = False
- # Listen for 'event_1'.
+ # Listen for 'event_1'
pgn = db.pgnotify('event_1', self.notify_callback, arg_dict)
- thread.start_new_thread(pgn, ())
+ thread = Thread(None, pgn)
+ thread.start()
# Wait until the thread has started.
- for n in range(1000):
+ for n in xrange(500):
if 'event' in arg_dict:
break
sleep(0.01)
self.assertTrue('event' in arg_dict)
+ self.assertTrue(thread.isAlive())
# Generate notification from the other connection.
db2.query("notify event_1, 'payload_1'")
- # Wait until the thread has ended.
- for n in range(1000):
+ # Wait until the notification has been caught.
+ for n in xrange(500):
if arg_dict['event'] == 'event_1':
break
sleep(0.01)
@@ -262,6 +265,23 @@
# Check that callback has been invoked.
self.assertTrue(arg_dict.get('called'))
self.assertFalse(self.notify_timeout)
+ arg_dict['called'] = False
+ self.assertTrue(thread.isAlive())
+ # Generate stop notification
+ db2.query("notify stop_event_1, 'payload_1'")
+ # Wait until the notification has been caught.
+ for n in xrange(500):
+ if arg_dict['event'] == 'stop_event_1':
+ break
+ sleep(0.01)
+ self.assertEqual(arg_dict['event'], 'stop_event_1')
+ self.assertEqual(arg_dict['extra'], 'payload_1')
+ self.assertTrue(isinstance(arg_dict['pid'], int))
+ # Check that callback has been invoked.
+ self.assertTrue(arg_dict.get('called'))
+ self.assertFalse(self.notify_timeout)
+ thread.join(5)
+ self.assertFalse(thread.isAlive())
def test_notify_timeout_DB(self):
db = opendb()
@@ -269,12 +289,14 @@
self.notify_timeout = False
# Listen for 'event_1'.
pgn = db.pgnotify('event_1', self.notify_callback, arg_dict, 0.01)
- thread.start_new_thread(pgn, ())
+ thread = Thread(None, pgn)
+ thread.start()
# Sleep long enough to time out.
sleep(0.02)
# Verify that we've indeed timed out.
self.assertFalse(arg_dict.get('called'))
self.assertTrue(self.notify_timeout)
+ self.assertFalse(thread.isAlive())
def test_notify(self):
db = opendb()
@@ -283,17 +305,19 @@
self.notify_timeout = False
# Listen for 'event_1'
pgn = pgnotify(db, 'event_1', self.notify_callback, arg_dict)
- thread.start_new_thread(pgn, ())
+ thread = Thread(None, pgn)
+ thread.start()
# Wait until the thread has started.
- for n in range(1000):
+ for n in xrange(500):
if 'event' in arg_dict:
break
sleep(0.01)
self.assertTrue('event' in arg_dict)
+ self.assertTrue(thread.isAlive())
# Generate notification from the other connection.
db2.query("notify event_1, 'payload_1'")
- # Wait until the thread has ended.
- for n in range(1000):
+ # Wait until the notification has been caught.
+ for n in xrange(500):
if arg_dict['event'] == 'event_1':
break
sleep(0.01)
@@ -303,6 +327,23 @@
# Check that callback has been invoked.
self.assertTrue(arg_dict.get('called'))
self.assertFalse(self.notify_timeout)
+ arg_dict['called'] = False
+ self.assertTrue(thread.isAlive())
+ # Generate stop notification
+ db2.query("notify stop_event_1, 'payload_1'")
+ # Wait until the notification has been caught.
+ for n in xrange(500):
+ if arg_dict['event'] == 'stop_event_1':
+ break
+ sleep(0.01)
+ self.assertEqual(arg_dict['event'], 'stop_event_1')
+ self.assertEqual(arg_dict['extra'], 'payload_1')
+ self.assertTrue(isinstance(arg_dict['pid'], int))
+ # Check that callback has been invoked.
+ self.assertTrue(arg_dict.get('called'))
+ self.assertFalse(self.notify_timeout)
+ thread.join(5)
+ self.assertFalse(thread.isAlive())
def test_notify_timeout(self):
db = opendb()
@@ -310,12 +351,15 @@
self.notify_timeout = False
# Listen for 'event_1'.
pgn = pgnotify(db, 'event_1', self.notify_callback, arg_dict, 0.01)
- thread.start_new_thread(pgn, ())
+ thread = Thread(None, pgn)
+ thread.start()
# Sleep long enough to time out.
sleep(0.02)
# Verify that we've indeed timed out.
self.assertFalse(arg_dict.get('called'))
self.assertTrue(self.notify_timeout)
+ self.assertFalse(thread.isAlive())
+
if __name__ == '__main__':
suite = unittest.TestSuite()
_______________________________________________
PyGreSQL mailing list
[email protected]
https://mail.vex.net/mailman/listinfo.cgi/pygresql