Author: cito
Date: Wed Jan 27 13:26:41 2016
New Revision: 790
Log:
Add test module for the notification handler
Forgot to check this in to the repository in r762.
Added:
branches/4.x/tests/test_classic_notification.py (contents, props changed)
trunk/tests/test_classic_notification.py (contents, props changed)
Added: branches/4.x/tests/test_classic_notification.py
==============================================================================
--- /dev/null 00:00:00 1970 (empty, because file is newly added)
+++ branches/4.x/tests/test_classic_notification.py Wed Jan 27 13:26:41
2016 (r790)
@@ -0,0 +1,415 @@
+#! /usr/bin/python
+# -*- coding: utf-8 -*-
+
+"""Test the classic PyGreSQL interface.
+
+Sub-tests for the notification handler object.
+
+Contributed by Christoph Zwerschke.
+
+These tests need a database to test against.
+
+"""
+
+try:
+ import unittest2 as unittest # for Python < 2.7
+except ImportError:
+ import unittest
+
+from time import sleep
+from threading import Thread
+
+import pg # the module under test
+
+# We need a database to test against. If LOCAL_PyGreSQL.py exists we will
+# get our information from that. Otherwise we use the defaults.
+# The current user must have create schema privilege on the database.
+dbname = 'unittest'
+dbhost = None
+dbport = 5432
+
+debug = False # let DB wrapper print debugging output
+
+try:
+ from LOCAL_PyGreSQL import *
+except ImportError:
+ pass
+
+
+def DB():
+ """Create a DB wrapper object connecting to the test database."""
+ db = pg.DB(dbname, dbhost, dbport)
+ if debug:
+ db.debug = debug
+ return db
+
+
+class TestSyncNotification(unittest.TestCase):
+ """Test notification handler running in the same thread."""
+
+ def setUp(self):
+ self.db = DB()
+ self.timeout = None
+ self.called = True
+ self.payloads = []
+
+ def tearDown(self):
+ if self.db:
+ self.db.close()
+
+ def callback(self, arg_dict):
+ if arg_dict is None:
+ self.timeout = True
+ else:
+ self.timeout = False
+ self.payloads.append(arg_dict.get('extra'))
+
+ def get_handler(self, event=None, arg_dict=None, stop_event=None):
+ if not event:
+ event = 'test_async_notification'
+ if not stop_event:
+ stop_event = 'stop_async_notification'
+ callback = self.callback
+ handler = self.db.notification_handler(
+ event, callback, arg_dict, 0, stop_event)
+ self.assertEqual(handler.event, event)
+ self.assertEqual(handler.stop_event, stop_event or 'stop_%s' % event)
+ self.assertIs(handler.callback, callback)
+ if arg_dict is None:
+ self.assertEqual(handler.arg_dict, {})
+ else:
+ self.assertIs(handler.arg_dict, arg_dict)
+ self.assertEqual(handler.timeout, 0)
+ self.assertFalse(handler.listening)
+ return handler
+
+ def testCloseHandler(self):
+ handler = self.get_handler()
+ self.assertIs(handler.db, self.db)
+ handler.close()
+ self.assertRaises(pg.InternalError, self.db.close)
+ self.db = None
+ self.assertIs(handler.db, None)
+
+ def testDeleteHandler(self):
+ handler = self.get_handler('test_del')
+ self.assertIs(handler.db, self.db)
+ handler.listen()
+ self.db.query('notify test_del')
+ self.db.query('notify test_del')
+ del handler
+ self.db.query('notify test_del')
+ n = 0
+ while self.db.getnotify() and n < 4:
+ n += 1
+ self.assertEqual(n, 2)
+
+ def testNotify(self):
+ handler = self.get_handler()
+ handler.listen()
+ self.assertRaises(TypeError, handler.notify, invalid=True)
+ handler.notify(payload='baz')
+ handler.notify(stop=True, payload='buz')
+ handler.unlisten()
+ self.db.close()
+ self.db = None
+
+ def testNotifyWithArgsAndPayload(self):
+ arg_dict = {'foo': 'bar'}
+ handler = self.get_handler(arg_dict=arg_dict)
+ self.assertEqual(handler.timeout, 0)
+ handler.listen()
+ handler.notify(payload='baz')
+ handler.notify(payload='biz')
+ handler()
+ self.assertIsNotNone(self.timeout)
+ self.assertFalse(self.timeout)
+ self.assertEqual(self.payloads, ['baz', 'biz'])
+ self.assertEqual(arg_dict['foo'], 'bar')
+ self.assertEqual(arg_dict['event'], handler.event)
+ self.assertIsInstance(arg_dict['pid'], int)
+ self.assertEqual(arg_dict['extra'], 'biz')
+ self.assertTrue(handler.listening)
+ del self.payloads[:]
+ handler.notify(stop=True, payload='buz')
+ handler()
+ self.assertIsNotNone(self.timeout)
+ self.assertFalse(self.timeout)
+ self.assertEqual(self.payloads, ['buz'])
+ self.assertEqual(arg_dict['foo'], 'bar')
+ self.assertEqual(arg_dict['event'], handler.stop_event)
+ self.assertIsInstance(arg_dict['pid'], int)
+ self.assertEqual(arg_dict['extra'], 'buz')
+ self.assertFalse(handler.listening)
+ handler.unlisten()
+
+ def testNotifyWrongEvent(self):
+ handler = self.get_handler('good_event')
+ self.assertEqual(handler.timeout, 0)
+ handler.listen()
+ handler.notify(payload="note 1")
+ self.db.query("notify bad_event, 'note 2'")
+ handler.notify(payload="note 3")
+ handler()
+ self.assertIsNotNone(self.timeout)
+ self.assertFalse(self.timeout)
+ self.assertEqual(self.payloads, ['note 1', 'note 3'])
+ self.assertTrue(handler.listening)
+ del self.payloads[:]
+ self.db.query('listen bad_event')
+ handler.notify(payload="note 4")
+ self.db.query("notify bad_event, 'note 5'")
+ handler.notify(payload="note 6")
+ try:
+ handler()
+ except pg.DatabaseError, error:
+ self.assertEqual(str(error),
+ 'Listening for "good_event" and "stop_good_event",'
+ ' but notified of "bad_event"')
+ self.assertIsNotNone(self.timeout)
+ self.assertFalse(self.timeout)
+ self.assertEqual(self.payloads, ['note 4'])
+ self.assertFalse(handler.listening)
+
+
+class TestAsyncNotification(unittest.TestCase):
+ """Test notification handler running in a separate thread."""
+
+ def setUp(self):
+ self.db = DB()
+
+ def tearDown(self):
+ self.doCleanups()
+ if self.db:
+ self.db.close()
+
+ def callback(self, arg_dict):
+ if arg_dict is None:
+ self.timeout = True
+ elif arg_dict is self.arg_dict:
+ arg_dict = arg_dict.copy()
+ pid = arg_dict.get('pid')
+ if isinstance(pid, int):
+ arg_dict['pid'] = 1
+ self.received.append(arg_dict)
+ else:
+ self.received.append(dict(error=arg_dict))
+
+ def start_handler(self,
+ event=None, arg_dict=None, timeout=5, stop_event=None):
+ db = DB()
+ if not event:
+ event = 'test_async_notification'
+ if not stop_event:
+ stop_event = 'stop_async_notification'
+ callback = self.callback
+ handler = db.notification_handler(
+ event, callback, arg_dict, timeout, stop_event)
+ self.handler = handler
+ self.assertIsInstance(handler, pg.NotificationHandler)
+ self.assertEqual(handler.event, event)
+ self.assertEqual(handler.stop_event, stop_event or 'stop_%s' % event)
+ self.event = handler.event
+ self.assertIs(handler.callback, callback)
+ if arg_dict is None:
+ self.assertEqual(handler.arg_dict, {})
+ else:
+ self.assertIsInstance(handler.arg_dict, dict)
+ self.arg_dict = handler.arg_dict
+ self.assertEqual(handler.timeout, timeout)
+ self.assertFalse(handler.listening)
+ thread = Thread(target=handler, name='test_notification_thread')
+ self.thread = thread
+ thread.start()
+ self.stopped = timeout == 0
+ self.addCleanup(self.stop_handler)
+ for n in range(500):
+ if handler.listening:
+ break
+ sleep(0.01)
+ self.assertTrue(handler.listening)
+ if not self.stopped:
+ self.assertTrue(thread.isAlive())
+ self.timeout = False
+ self.received = []
+ self.sent = []
+
+ def stop_handler(self):
+ handler = self.handler
+ thread = self.thread
+ if not self.stopped and self.handler.listening:
+ self.notify_handler(stop=True)
+ handler.close()
+ self.db = None
+ if thread.isAlive():
+ thread.join(5)
+ self.assertFalse(handler.listening)
+ self.assertFalse(thread.isAlive())
+
+ def notify_handler(self, stop=False, payload=None):
+ event = self.event
+ if stop:
+ event = self.handler.stop_event
+ self.stopped = True
+ arg_dict = self.arg_dict.copy()
+ arg_dict.update(event=event, pid=1, extra=payload or '')
+ self.handler.notify(db=self.db, stop=stop, payload=payload)
+ self.sent.append(arg_dict)
+
+ def notify_query(self, stop=False, payload=None):
+ event = self.event
+ if stop:
+ event = self.handler.stop_event
+ self.stopped = True
+ q = 'notify "%s"' % event
+ if payload:
+ q += ", '%s'" % payload
+ arg_dict = self.arg_dict.copy()
+ arg_dict.update(event=event, pid=1, extra=payload or '')
+ self.db.query(q)
+ self.sent.append(arg_dict)
+
+ def wait(self):
+ for n in range(500):
+ if self.timeout:
+ return False
+ if len(self.received) >= len(self.sent):
+ return True
+ sleep(0.01)
+
+ def receive(self, stop=False):
+ if not self.sent:
+ stop = True
+ if stop:
+ self.notify_handler(stop=True, payload='stop')
+ self.assertTrue(self.wait())
+ self.assertFalse(self.timeout)
+ self.assertEqual(self.received, self.sent)
+ self.received = []
+ self.sent = []
+ self.assertEqual(self.handler.listening, not self.stopped)
+
+ def testNotifyHandlerEmpty(self):
+ self.start_handler()
+ self.notify_handler(stop=True)
+ self.assertEqual(len(self.sent), 1)
+ self.receive()
+
+ def testNotifyQueryEmpty(self):
+ self.start_handler()
+ self.notify_query(stop=True)
+ self.assertEqual(len(self.sent), 1)
+ self.receive()
+
+ def testNotifyHandlerOnce(self):
+ self.start_handler()
+ self.notify_handler()
+ self.assertEqual(len(self.sent), 1)
+ self.receive()
+ self.receive(stop=True)
+
+ def testNotifyQueryOnce(self):
+ self.start_handler()
+ self.notify_query()
+ self.receive()
+ self.notify_query(stop=True)
+ self.receive()
+
+ def testNotifyWithArgs(self):
+ arg_dict = {'test': 42, 'more': 43, 'less': 41}
+ self.start_handler('test_args', arg_dict)
+ self.notify_query()
+ self.receive(stop=True)
+
+ def testNotifySeveralTimes(self):
+ arg_dict = {'test': 1}
+ self.start_handler(arg_dict=arg_dict)
+ for count in range(3):
+ self.notify_query()
+ self.receive()
+ arg_dict['test'] += 1
+ for count in range(2):
+ self.notify_handler()
+ self.receive()
+ arg_dict['test'] += 1
+ for count in range(3):
+ self.notify_query()
+ self.receive(stop=True)
+
+ def testNotifyOnceWithPayload(self):
+ self.start_handler()
+ self.notify_query(payload='test_payload')
+ self.receive(stop=True)
+
+ def testNotifyWithArgsAndPayload(self):
+ self.start_handler(arg_dict={'foo': 'bar'})
+ self.notify_query(payload='baz')
+ self.receive(stop=True)
+
+ def testNotifyQuotedNames(self):
+ self.start_handler('Hello, World!')
+ self.notify_query(payload='How do you do?')
+ self.receive(stop=True)
+
+ def testNotifyWithFivePayloads(self):
+ self.start_handler('gimme_5', {'test': 'Gimme 5'})
+ for count in range(5):
+ self.notify_query(payload="Round %d" % count)
+ self.assertEqual(len(self.sent), 5)
+ self.receive(stop=True)
+
+ def testReceiveImmediately(self):
+ self.start_handler('immediate', {'test': 'immediate'})
+ for count in range(3):
+ self.notify_query(payload="Round %d" % count)
+ self.receive()
+ self.receive(stop=True)
+
+ def testNotifyDistinctInTransaction(self):
+ self.start_handler('test_transaction', {'transaction': True})
+ self.db.begin()
+ for count in range(3):
+ self.notify_query(payload='Round %d' % count)
+ self.db.commit()
+ self.receive(stop=True)
+
+ def testNotifySameInTransaction(self):
+ self.start_handler('test_transaction', {'transaction': True})
+ self.db.begin()
+ for count in range(3):
+ self.notify_query()
+ self.db.commit()
+ # these same notifications may be delivered as one,
+ # so we must not wait for all three to appear
+ self.sent = self.sent[:1]
+ self.receive(stop=True)
+
+ def testNotifyNoTimeout(self):
+ self.start_handler(timeout=None)
+ self.assertIsNone(self.handler.timeout)
+ self.assertTrue(self.handler.listening)
+ sleep(0.02)
+ self.assertFalse(self.timeout)
+ self.receive(stop=True)
+
+ def testNotifyZeroTimeout(self):
+ self.start_handler(timeout=0)
+ self.assertEqual(self.handler.timeout, 0)
+ self.assertTrue(self.handler.listening)
+ self.assertFalse(self.timeout)
+
+ def testNotifyWithoutTimeout(self):
+ self.start_handler(timeout=1)
+ self.assertEqual(self.handler.timeout, 1)
+ sleep(0.02)
+ self.assertFalse(self.timeout)
+ self.receive(stop=True)
+
+ def testNotifyWithTimeout(self):
+ self.start_handler(timeout=0.01)
+ sleep(0.02)
+ self.assertTrue(self.timeout)
+
+
+if __name__ == '__main__':
+ unittest.main()
Added: trunk/tests/test_classic_notification.py
==============================================================================
--- /dev/null 00:00:00 1970 (empty, because file is newly added)
+++ trunk/tests/test_classic_notification.py Wed Jan 27 13:26:41 2016
(r790)
@@ -0,0 +1,447 @@
+#! /usr/bin/python
+# -*- coding: utf-8 -*-
+
+"""Test the classic PyGreSQL interface.
+
+Sub-tests for the notification handler object.
+
+Contributed by Christoph Zwerschke.
+
+These tests need a database to test against.
+"""
+
+try:
+ import unittest2 as unittest # for Python < 2.7
+except ImportError:
+ import unittest
+
+import warnings
+from time import sleep
+from threading import Thread
+
+import pg # the module under test
+
+# We need a database to test against. If LOCAL_PyGreSQL.py exists we will
+# get our information from that. Otherwise we use the defaults.
+# The current user must have create schema privilege on the database.
+dbname = 'unittest'
+dbhost = None
+dbport = 5432
+
+debug = False # let DB wrapper print debugging output
+
+try:
+ from .LOCAL_PyGreSQL import *
+except (ImportError, ValueError):
+ try:
+ from LOCAL_PyGreSQL import *
+ except ImportError:
+ pass
+
+
+def DB():
+ """Create a DB wrapper object connecting to the test database."""
+ db = pg.DB(dbname, dbhost, dbport)
+ if debug:
+ db.debug = debug
+ return db
+
+
+class TestPyNotifyAlias(unittest.TestCase):
+ """Test alternative ways of creating a NotificationHandler."""
+
+ def callback(self):
+ self.fail('Callback should not be called in this test')
+
+ def testPgNotify(self):
+ db = DB()
+ arg_dict = {}
+ args = ('test_event', self.callback, arg_dict)
+ kwargs = dict(timeout=2, stop_event='test_stop')
+ with warnings.catch_warnings(record=True) as warn_msgs:
+ warnings.simplefilter("always")
+ handler1 = pg.pgnotify(db, *args, **kwargs)
+ assert len(warn_msgs) == 1
+ warn_msg = warn_msgs[0]
+ assert issubclass(warn_msg.category, DeprecationWarning)
+ assert 'deprecated' in str(warn_msg.message)
+ self.assertIsInstance(handler1, pg.NotificationHandler)
+ handler2 = db.notification_handler(*args, **kwargs)
+ self.assertIsInstance(handler2, pg.NotificationHandler)
+ self.assertIs(handler1.db, handler2.db)
+ self.assertEqual(handler1.event, handler2.event)
+ self.assertIs(handler1.callback, handler2.callback)
+ self.assertIs(handler1.arg_dict, handler2.arg_dict)
+ self.assertEqual(handler1.timeout, handler2.timeout)
+ self.assertEqual(handler1.stop_event, handler2.stop_event)
+
+
+class TestSyncNotification(unittest.TestCase):
+ """Test notification handler running in the same thread."""
+
+ def setUp(self):
+ self.db = DB()
+ self.timeout = None
+ self.called = True
+ self.payloads = []
+
+ def tearDown(self):
+ if self.db:
+ self.db.close()
+
+ def callback(self, arg_dict):
+ if arg_dict is None:
+ self.timeout = True
+ else:
+ self.timeout = False
+ self.payloads.append(arg_dict.get('extra'))
+
+ def get_handler(self, event=None, arg_dict=None, stop_event=None):
+ if not event:
+ event = 'test_async_notification'
+ if not stop_event:
+ stop_event = 'stop_async_notification'
+ callback = self.callback
+ handler = self.db.notification_handler(
+ event, callback, arg_dict, 0, stop_event)
+ self.assertEqual(handler.event, event)
+ self.assertEqual(handler.stop_event, stop_event or 'stop_%s' % event)
+ self.assertIs(handler.callback, callback)
+ if arg_dict is None:
+ self.assertEqual(handler.arg_dict, {})
+ else:
+ self.assertIs(handler.arg_dict, arg_dict)
+ self.assertEqual(handler.timeout, 0)
+ self.assertFalse(handler.listening)
+ return handler
+
+ def testCloseHandler(self):
+ handler = self.get_handler()
+ self.assertIs(handler.db, self.db)
+ handler.close()
+ self.assertRaises(pg.InternalError, self.db.close)
+ self.db = None
+ self.assertIs(handler.db, None)
+
+ def testDeleteHandler(self):
+ handler = self.get_handler('test_del')
+ self.assertIs(handler.db, self.db)
+ handler.listen()
+ self.db.query('notify test_del')
+ self.db.query('notify test_del')
+ del handler
+ self.db.query('notify test_del')
+ n = 0
+ while self.db.getnotify() and n < 4:
+ n += 1
+ self.assertEqual(n, 2)
+
+ def testNotify(self):
+ handler = self.get_handler()
+ handler.listen()
+ self.assertRaises(TypeError, handler.notify, invalid=True)
+ handler.notify(payload='baz')
+ handler.notify(stop=True, payload='buz')
+ handler.unlisten()
+ self.db.close()
+ self.db = None
+
+ def testNotifyWithArgsAndPayload(self):
+ arg_dict = {'foo': 'bar'}
+ handler = self.get_handler(arg_dict=arg_dict)
+ self.assertEqual(handler.timeout, 0)
+ handler.listen()
+ handler.notify(payload='baz')
+ handler.notify(payload='biz')
+ handler()
+ self.assertIsNotNone(self.timeout)
+ self.assertFalse(self.timeout)
+ self.assertEqual(self.payloads, ['baz', 'biz'])
+ self.assertEqual(arg_dict['foo'], 'bar')
+ self.assertEqual(arg_dict['event'], handler.event)
+ self.assertIsInstance(arg_dict['pid'], int)
+ self.assertEqual(arg_dict['extra'], 'biz')
+ self.assertTrue(handler.listening)
+ del self.payloads[:]
+ handler.notify(stop=True, payload='buz')
+ handler()
+ self.assertIsNotNone(self.timeout)
+ self.assertFalse(self.timeout)
+ self.assertEqual(self.payloads, ['buz'])
+ self.assertEqual(arg_dict['foo'], 'bar')
+ self.assertEqual(arg_dict['event'], handler.stop_event)
+ self.assertIsInstance(arg_dict['pid'], int)
+ self.assertEqual(arg_dict['extra'], 'buz')
+ self.assertFalse(handler.listening)
+ handler.unlisten()
+
+ def testNotifyWrongEvent(self):
+ handler = self.get_handler('good_event')
+ self.assertEqual(handler.timeout, 0)
+ handler.listen()
+ handler.notify(payload="note 1")
+ self.db.query("notify bad_event, 'note 2'")
+ handler.notify(payload="note 3")
+ handler()
+ self.assertIsNotNone(self.timeout)
+ self.assertFalse(self.timeout)
+ self.assertEqual(self.payloads, ['note 1', 'note 3'])
+ self.assertTrue(handler.listening)
+ del self.payloads[:]
+ self.db.query('listen bad_event')
+ handler.notify(payload="note 4")
+ self.db.query("notify bad_event, 'note 5'")
+ handler.notify(payload="note 6")
+ try:
+ handler()
+ except pg.DatabaseError as error:
+ self.assertEqual(str(error),
+ 'Listening for "good_event" and "stop_good_event",'
+ ' but notified of "bad_event"')
+ self.assertIsNotNone(self.timeout)
+ self.assertFalse(self.timeout)
+ self.assertEqual(self.payloads, ['note 4'])
+ self.assertFalse(handler.listening)
+
+
+class TestAsyncNotification(unittest.TestCase):
+ """Test notification handler running in a separate thread."""
+
+ def setUp(self):
+ self.db = DB()
+
+ def tearDown(self):
+ self.doCleanups()
+ if self.db:
+ self.db.close()
+
+ def callback(self, arg_dict):
+ if arg_dict is None:
+ self.timeout = True
+ elif arg_dict is self.arg_dict:
+ arg_dict = arg_dict.copy()
+ pid = arg_dict.get('pid')
+ if isinstance(pid, int):
+ arg_dict['pid'] = 1
+ self.received.append(arg_dict)
+ else:
+ self.received.append(dict(error=arg_dict))
+
+ def start_handler(self,
+ event=None, arg_dict=None, timeout=5, stop_event=None):
+ db = DB()
+ if not event:
+ event = 'test_async_notification'
+ if not stop_event:
+ stop_event = 'stop_async_notification'
+ callback = self.callback
+ handler = db.notification_handler(
+ event, callback, arg_dict, timeout, stop_event)
+ self.handler = handler
+ self.assertIsInstance(handler, pg.NotificationHandler)
+ self.assertEqual(handler.event, event)
+ self.assertEqual(handler.stop_event, stop_event or 'stop_%s' % event)
+ self.event = handler.event
+ self.assertIs(handler.callback, callback)
+ if arg_dict is None:
+ self.assertEqual(handler.arg_dict, {})
+ else:
+ self.assertIsInstance(handler.arg_dict, dict)
+ self.arg_dict = handler.arg_dict
+ self.assertEqual(handler.timeout, timeout)
+ self.assertFalse(handler.listening)
+ thread = Thread(target=handler, name='test_notification_thread')
+ self.thread = thread
+ thread.start()
+ self.stopped = timeout == 0
+ self.addCleanup(self.stop_handler)
+ for n in range(500):
+ if handler.listening:
+ break
+ sleep(0.01)
+ self.assertTrue(handler.listening)
+ if not self.stopped:
+ self.assertTrue(thread.is_alive())
+ self.timeout = False
+ self.received = []
+ self.sent = []
+
+ def stop_handler(self):
+ handler = self.handler
+ thread = self.thread
+ if not self.stopped and self.handler.listening:
+ self.notify_handler(stop=True)
+ handler.close()
+ self.db = None
+ if thread.is_alive():
+ thread.join(5)
+ self.assertFalse(handler.listening)
+ self.assertFalse(thread.is_alive())
+
+ def notify_handler(self, stop=False, payload=None):
+ event = self.event
+ if stop:
+ event = self.handler.stop_event
+ self.stopped = True
+ arg_dict = self.arg_dict.copy()
+ arg_dict.update(event=event, pid=1, extra=payload or '')
+ self.handler.notify(db=self.db, stop=stop, payload=payload)
+ self.sent.append(arg_dict)
+
+ def notify_query(self, stop=False, payload=None):
+ event = self.event
+ if stop:
+ event = self.handler.stop_event
+ self.stopped = True
+ q = 'notify "%s"' % event
+ if payload:
+ q += ", '%s'" % payload
+ arg_dict = self.arg_dict.copy()
+ arg_dict.update(event=event, pid=1, extra=payload or '')
+ self.db.query(q)
+ self.sent.append(arg_dict)
+
+ def wait(self):
+ for n in range(500):
+ if self.timeout:
+ return False
+ if len(self.received) >= len(self.sent):
+ return True
+ sleep(0.01)
+
+ def receive(self, stop=False):
+ if not self.sent:
+ stop = True
+ if stop:
+ self.notify_handler(stop=True, payload='stop')
+ self.assertTrue(self.wait())
+ self.assertFalse(self.timeout)
+ self.assertEqual(self.received, self.sent)
+ self.received = []
+ self.sent = []
+ self.assertEqual(self.handler.listening, not self.stopped)
+
+ def testNotifyHandlerEmpty(self):
+ self.start_handler()
+ self.notify_handler(stop=True)
+ self.assertEqual(len(self.sent), 1)
+ self.receive()
+
+ def testNotifyQueryEmpty(self):
+ self.start_handler()
+ self.notify_query(stop=True)
+ self.assertEqual(len(self.sent), 1)
+ self.receive()
+
+ def testNotifyHandlerOnce(self):
+ self.start_handler()
+ self.notify_handler()
+ self.assertEqual(len(self.sent), 1)
+ self.receive()
+ self.receive(stop=True)
+
+ def testNotifyQueryOnce(self):
+ self.start_handler()
+ self.notify_query()
+ self.receive()
+ self.notify_query(stop=True)
+ self.receive()
+
+ def testNotifyWithArgs(self):
+ arg_dict = {'test': 42, 'more': 43, 'less': 41}
+ self.start_handler('test_args', arg_dict)
+ self.notify_query()
+ self.receive(stop=True)
+
+ def testNotifySeveralTimes(self):
+ arg_dict = {'test': 1}
+ self.start_handler(arg_dict=arg_dict)
+ for count in range(3):
+ self.notify_query()
+ self.receive()
+ arg_dict['test'] += 1
+ for count in range(2):
+ self.notify_handler()
+ self.receive()
+ arg_dict['test'] += 1
+ for count in range(3):
+ self.notify_query()
+ self.receive(stop=True)
+
+ def testNotifyOnceWithPayload(self):
+ self.start_handler()
+ self.notify_query(payload='test_payload')
+ self.receive(stop=True)
+
+ def testNotifyWithArgsAndPayload(self):
+ self.start_handler(arg_dict={'foo': 'bar'})
+ self.notify_query(payload='baz')
+ self.receive(stop=True)
+
+ def testNotifyQuotedNames(self):
+ self.start_handler('Hello, World!')
+ self.notify_query(payload='How do you do?')
+ self.receive(stop=True)
+
+ def testNotifyWithFivePayloads(self):
+ self.start_handler('gimme_5', {'test': 'Gimme 5'})
+ for count in range(5):
+ self.notify_query(payload="Round %d" % count)
+ self.assertEqual(len(self.sent), 5)
+ self.receive(stop=True)
+
+ def testReceiveImmediately(self):
+ self.start_handler('immediate', {'test': 'immediate'})
+ for count in range(3):
+ self.notify_query(payload="Round %d" % count)
+ self.receive()
+ self.receive(stop=True)
+
+ def testNotifyDistinctInTransaction(self):
+ self.start_handler('test_transaction', {'transaction': True})
+ self.db.begin()
+ for count in range(3):
+ self.notify_query(payload='Round %d' % count)
+ self.db.commit()
+ self.receive(stop=True)
+
+ def testNotifySameInTransaction(self):
+ self.start_handler('test_transaction', {'transaction': True})
+ self.db.begin()
+ for count in range(3):
+ self.notify_query()
+ self.db.commit()
+ # these same notifications may be delivered as one,
+ # so we must not wait for all three to appear
+ self.sent = self.sent[:1]
+ self.receive(stop=True)
+
+ def testNotifyNoTimeout(self):
+ self.start_handler(timeout=None)
+ self.assertIsNone(self.handler.timeout)
+ self.assertTrue(self.handler.listening)
+ sleep(0.02)
+ self.assertFalse(self.timeout)
+ self.receive(stop=True)
+
+ def testNotifyZeroTimeout(self):
+ self.start_handler(timeout=0)
+ self.assertEqual(self.handler.timeout, 0)
+ self.assertTrue(self.handler.listening)
+ self.assertFalse(self.timeout)
+
+ def testNotifyWithoutTimeout(self):
+ self.start_handler(timeout=1)
+ self.assertEqual(self.handler.timeout, 1)
+ sleep(0.02)
+ self.assertFalse(self.timeout)
+ self.receive(stop=True)
+
+ def testNotifyWithTimeout(self):
+ self.start_handler(timeout=0.01)
+ sleep(0.02)
+ self.assertTrue(self.timeout)
+
+
+if __name__ == '__main__':
+ unittest.main()
_______________________________________________
PyGreSQL mailing list
[email protected]
https://mail.vex.net/mailman/listinfo.cgi/pygresql