Author: cito
Date: Wed Nov 25 08:37:33 2015
New Revision: 624
Log:
Avoid test thread running forever if it fails
Modified:
trunk/module/TEST_PyGreSQL_classic.py
Modified: trunk/module/TEST_PyGreSQL_classic.py
==============================================================================
--- trunk/module/TEST_PyGreSQL_classic.py Wed Nov 25 07:59:03 2015
(r623)
+++ trunk/module/TEST_PyGreSQL_classic.py Wed Nov 25 08:37:33 2015
(r624)
@@ -261,65 +261,70 @@
arg_dict = dict(event=None, called=False)
self.notify_timeout = False
# Listen for 'event_1'.
- target = fut('event_1', self.notify_callback, arg_dict)
+ target = fut('event_1', self.notify_callback, arg_dict, 5)
thread = Thread(None, target)
thread.start()
- # Wait until the thread has started.
- for n in range(500):
- if target.listening:
- break
- sleep(0.01)
- self.assertTrue(target.listening)
- self.assertTrue(thread.isAlive())
- # Open another connection for sending notifications.
- db2 = opendb()
- # Generate notification from the other connection.
- if two_payloads:
- db2.begin()
- if call_notify:
+ try:
+ # Wait until the thread has started.
+ for n in range(500):
+ if target.listening:
+ break
+ sleep(0.01)
+ self.assertTrue(target.listening)
+ self.assertTrue(thread.isAlive())
+ # Open another connection for sending notifications.
+ db2 = opendb()
+ # Generate notification from the other connection.
if two_payloads:
- target.notify(db2, payload='payload 0')
- target.notify(db2, payload='payload 1')
- else:
+ db2.begin()
+ if call_notify:
+ if two_payloads:
+ target.notify(db2, payload='payload 0')
+ target.notify(db2, payload='payload 1')
+ else:
+ if two_payloads:
+ db2.query("notify event_1, 'payload 0'")
+ db2.query("notify event_1, 'payload 1'")
if two_payloads:
- db2.query("notify event_1, 'payload 0'")
- db2.query("notify event_1, 'payload 1'")
- if two_payloads:
- db2.commit()
- # Wait until the notification has been caught.
- for n in range(500):
- if arg_dict['called'] or self.notify_timeout:
- break
- sleep(0.01)
- # Check that callback has been invoked.
- self.assertTrue(arg_dict['called'])
- self.assertEqual(arg_dict['event'], 'event_1')
- self.assertEqual(arg_dict['extra'], 'payload 1')
- self.assertTrue(isinstance(arg_dict['pid'], int))
- self.assertFalse(self.notify_timeout)
- arg_dict['called'] = False
- self.assertTrue(thread.isAlive())
- # Generate stop notification.
- if call_notify:
- target.notify(db2, stop=True, payload='payload 2')
- else:
- db2.query("notify stop_event_1, 'payload 2'")
- db2.close()
- # Wait until the notification has been caught.
- for n in range(500):
- if arg_dict['called'] or self.notify_timeout:
- break
- sleep(0.01)
- # Check that callback has been invoked.
- self.assertTrue(arg_dict['called'])
- self.assertEqual(arg_dict['event'], 'stop_event_1')
- self.assertEqual(arg_dict['extra'], 'payload 2')
- self.assertTrue(isinstance(arg_dict['pid'], int))
- self.assertFalse(self.notify_timeout)
- thread.join(5)
- self.assertFalse(thread.isAlive())
- self.assertFalse(target.listening)
- target.close()
+ db2.commit()
+ # Wait until the notification has been caught.
+ for n in range(500):
+ if arg_dict['called'] or self.notify_timeout:
+ break
+ sleep(0.01)
+ # Check that callback has been invoked.
+ self.assertTrue(arg_dict['called'])
+ self.assertEqual(arg_dict['event'], 'event_1')
+ self.assertEqual(arg_dict['extra'], 'payload 1')
+ self.assertTrue(isinstance(arg_dict['pid'], int))
+ self.assertFalse(self.notify_timeout)
+ arg_dict['called'] = False
+ self.assertTrue(thread.isAlive())
+ # Generate stop notification.
+ if call_notify:
+ target.notify(db2, stop=True, payload='payload 2')
+ else:
+ db2.query("notify stop_event_1, 'payload 2'")
+ db2.close()
+ # Wait until the notification has been caught.
+ for n in range(500):
+ if arg_dict['called'] or self.notify_timeout:
+ break
+ sleep(0.01)
+ # Check that callback has been invoked.
+ self.assertTrue(arg_dict['called'])
+ self.assertEqual(arg_dict['event'], 'stop_event_1')
+ self.assertEqual(arg_dict['extra'], 'payload 2')
+ self.assertTrue(isinstance(arg_dict['pid'], int))
+ self.assertFalse(self.notify_timeout)
+ thread.join(5)
+ self.assertFalse(thread.isAlive())
+ self.assertFalse(target.listening)
+ target.close()
+ except Exception:
+ target.close()
+ if thread.is_alive():
+ thread.join(5)
def test_notify_other_options(self):
for run_as_method in False, True:
_______________________________________________
PyGreSQL mailing list
[email protected]
https://mail.vex.net/mailman/listinfo.cgi/pygresql