Author: cito
Date: Sun Jan 17 11:32:18 2016
New Revision: 762
Log:
Docs and 100% test coverage for NotificationHandler
Modified:
branches/4.x/docs/contents/changelog.rst
branches/4.x/docs/contents/pg/db_wrapper.rst
branches/4.x/docs/contents/pg/index.rst
branches/4.x/pg.py
branches/4.x/tests/test_classic_dbwrapper.py
trunk/docs/contents/changelog.rst
trunk/docs/contents/pg/db_wrapper.rst
trunk/docs/contents/pg/index.rst
trunk/pg.py
trunk/tests/test_classic.py
trunk/tests/test_classic_dbwrapper.py
Modified: branches/4.x/docs/contents/changelog.rst
==============================================================================
--- branches/4.x/docs/contents/changelog.rst Sun Jan 17 11:19:32 2016
(r761)
+++ branches/4.x/docs/contents/changelog.rst Sun Jan 17 11:32:18 2016
(r762)
@@ -17,12 +17,14 @@
- get_tables() does not list information schema tables any more.
- Fix notification handler (Thanks Patrick TJ McPhee).
- Fix a small issue with large objects.
+- Minor improvements in the NotificationHandler.
+- Converted documentation to Sphinx and added many missing parts.
- The tutorial files have become a chapter in the documentation.
-- Greatly improve unit testing, tests run with Python 2.4 to 2.7 again.
+- Greatly improved unit testing, tests run with Python 2.4 to 2.7 again.
Version 4.1.1 (2013-01-08)
--------------------------
-- Add WhenNotified class and method. Replaces need for third party pgnotify.
+- Add NotificationHandler class and method. Replaces need for pgnotify.
- Sharpen test for inserting current_timestamp.
- Add more quote tests. False and 0 should evaluate to NULL.
- More tests - Any number other than 0 is True.
Modified: branches/4.x/docs/contents/pg/db_wrapper.rst
==============================================================================
--- branches/4.x/docs/contents/pg/db_wrapper.rst Sun Jan 17 11:19:32
2016 (r761)
+++ branches/4.x/docs/contents/pg/db_wrapper.rst Sun Jan 17 11:32:18
2016 (r762)
@@ -529,3 +529,22 @@
currently regular type names are used.
.. versionadded:: 4.1
+
+notification_handler -- create a notification handler
+-----------------------------------------------------
+
+.. class:: DB.notification_handler(event, callback, [arg_dict], [timeout],
[stop_event])
+
+ Create a notification handler instance
+
+ :param str event: the name of an event to listen for
+ :param callback: a callback function
+ :param dict arg_dict: an optional dictionary for passing arguments
+ :param timeout: the time-out when waiting for notifications
+ :type timeout: int, float or None
+ :param str stop_event: an optional different name to be used as stop event
+
+This method creates a :class:`pg.NotificationHandler` object using the
+:class:`DB` connection as explained under :doc:`notification`.
+
+.. versionadded:: 4.1.1
Modified: branches/4.x/docs/contents/pg/index.rst
==============================================================================
--- branches/4.x/docs/contents/pg/index.rst Sun Jan 17 11:19:32 2016
(r761)
+++ branches/4.x/docs/contents/pg/index.rst Sun Jan 17 11:32:18 2016
(r762)
@@ -14,3 +14,4 @@
db_wrapper
query
large_objects
+ notification
Modified: branches/4.x/pg.py
==============================================================================
--- branches/4.x/pg.py Sun Jan 17 11:19:32 2016 (r761)
+++ branches/4.x/pg.py Sun Jan 17 11:32:18 2016 (r762)
@@ -143,23 +143,28 @@
class NotificationHandler(object):
"""A PostgreSQL client-side asynchronous notification handler."""
- def __init__(self, db, event, callback, arg_dict=None, timeout=None):
+ def __init__(self, db, event, callback=None,
+ arg_dict=None, timeout=None, stop_event=None):
"""Initialize the notification handler.
- db - PostgreSQL connection object.
- event - Event (notification channel) to LISTEN for.
- callback - Event callback function.
- arg_dict - A dictionary passed as the argument to the callback.
- timeout - Timeout in seconds; a floating point number denotes
- fractions of seconds. If it is absent or None, the
- callers will never time out.
+ You must pass a PyGreSQL database connection, the name of an
+ event (notification channel) to listen for and a callback function.
+ You can also specify a dictionary arg_dict that will be passed as
+ the single argument to the callback function, and a timeout value
+ in seconds (a floating point number denotes fractions of seconds).
+ If it is absent or None, the callers will never time out. If the
+ timeout is reached, the callback function will be called with a
+ single argument that is None. If you set the timeout to zero,
+ the handler will poll notifications synchronously and return.
+
+ You can specify the name of the event that will be used to signal
+ the handler to stop listening as stop_event. By default, it will
+ be the event name prefixed with 'stop_'.
"""
- if isinstance(db, DB):
- db = db.db
self.db = db
self.event = event
- self.stop_event = 'stop_%s' % event
+ self.stop_event = stop_event or 'stop_%s' % event
self.listening = False
self.callback = callback
if arg_dict is None:
@@ -168,7 +173,7 @@
self.timeout = timeout
def __del__(self):
- self.close()
+ self.unlisten()
def close(self):
"""Stop listening and close the connection."""
@@ -194,42 +199,47 @@
def notify(self, db=None, stop=False, payload=None):
"""Generate a notification.
- Note: If the main loop is running in another thread, you must pass
- a different database connection to avoid a collision.
+ Optionally, you can pass a payload with the notification.
- The payload parameter is only supported in PostgreSQL >= 9.0.
+ If you set the stop flag, a stop notification will be sent that
+ will cause the handler to stop listening.
+ Note: If the notification handler is running in another thread, you
+ must pass a different database connection since PyGreSQL database
+ connections are not thread-safe.
"""
- if not db:
- db = self.db
if self.listening:
+ if not db:
+ db = self.db
q = 'notify "%s"' % (stop and self.stop_event or self.event)
if payload:
q += ", '%s'" % payload
return db.query(q)
- def __call__(self, close=False):
+ def __call__(self):
"""Invoke the notification handler.
- The handler is a loop that actually LISTENs for two NOTIFY messages:
-
- <event> and stop_<event>.
-
- When either of these NOTIFY messages are received, its associated
- 'pid' and 'event' are inserted into <arg_dict>, and the callback is
- invoked with <arg_dict>. If the NOTIFY message is stop_<event>, the
- handler UNLISTENs both <event> and stop_<event> and exits.
+ The handler is a loop that listens for notifications on the event
+ and stop event channels. When either of these notifications are
+ received, its associated 'pid', 'event' and 'extra' (the payload
+ passed with the notification) are inserted into its arg_dict
+ dictionary and the callback is invoked with this dictionary as
+ a single argument. When the handler receives a stop event, it
+ stops listening to both events and return.
+
+ In the special case that the timeout of the handler has been set
+ to zero, the handler will poll all events synchronously and return.
+ If will keep listening until it receives a stop event.
Note: If you run this loop in another thread, don't use the same
database connection for database operations in the main thread.
-
"""
self.listen()
- _ilist = [self.db.fileno()]
-
+ poll = self.timeout == 0
+ if not poll:
+ rlist = [self.db.fileno()]
while self.listening:
- ilist, _olist, _elist = select.select(_ilist, [], [], self.timeout)
- if ilist:
+ if poll or select.select(rlist, [], [], self.timeout)[0]:
while self.listening:
notice = self.db.getnotify()
if not notice: # no more messages
@@ -238,14 +248,14 @@
if event not in (self.event, self.stop_event):
self.unlisten()
raise _db_error(
- 'listening for "%s" and "%s", but notified of "%s"'
+ 'Listening for "%s" and "%s", but notified of "%s"'
% (self.event, self.stop_event, event))
if event == self.stop_event:
self.unlisten()
- self.arg_dict['pid'] = pid
- self.arg_dict['event'] = event
- self.arg_dict['extra'] = extra
+ self.arg_dict.update(pid=pid, event=event, extra=extra)
self.callback(self.arg_dict)
+ if poll:
+ break
else: # we timed out
self.unlisten()
self.callback(None)
@@ -1155,9 +1165,11 @@
self._do_debug(q)
return self.query(q)
- def notification_handler(self, event, callback, arg_dict={}, timeout=None):
+ def notification_handler(self,
+ event, callback, arg_dict=None, timeout=None, stop_event=None):
"""Get notification handler that will run the given callback."""
- return NotificationHandler(self.db, event, callback, arg_dict, timeout)
+ return NotificationHandler(self,
+ event, callback, arg_dict, timeout, stop_event)
# if run as script, print some information
Modified: branches/4.x/tests/test_classic_dbwrapper.py
==============================================================================
--- branches/4.x/tests/test_classic_dbwrapper.py Sun Jan 17 11:19:32
2016 (r761)
+++ branches/4.x/tests/test_classic_dbwrapper.py Sun Jan 17 11:32:18
2016 (r762)
@@ -225,6 +225,26 @@
self.assertRaises(pg.InternalError, self.db.close)
self.assertRaises(pg.InternalError, self.db.query, 'select 1')
+ def testMethodReset(self):
+ con = self.db.db
+ self.db.reset()
+ self.assertIs(self.db.db, con)
+ self.db.query("select 1+1")
+ self.db.close()
+ self.assertRaises(pg.InternalError, self.db.reset)
+
+ def testMethodReopen(self):
+ con = self.db.db
+ self.db.reopen()
+ self.assertIsNot(self.db.db, con)
+ con = self.db.db
+ self.db.query("select 1+1")
+ self.db.close()
+ self.db.reopen()
+ self.assertIsNot(self.db.db, con)
+ self.db.query("select 1+1")
+ self.db.close()
+
def testExistingConnection(self):
db = pg.DB(self.db.db)
self.assertEqual(self.db.db, db.db)
@@ -1433,6 +1453,83 @@
self.assertEqual(r, s)
query('drop table bytea_test')
+ def testNotificationHandler(self):
+ # the notification handler itself is tested separately
+ f = self.db.notification_handler
+ callback = lambda arg_dict: None
+ handler = f('test', callback)
+ self.assertIsInstance(handler, pg.NotificationHandler)
+ self.assertIs(handler.db, self.db)
+ self.assertEqual(handler.event, 'test')
+ self.assertEqual(handler.stop_event, 'stop_test')
+ self.assertIs(handler.callback, callback)
+ self.assertIsInstance(handler.arg_dict, dict)
+ self.assertEqual(handler.arg_dict, {})
+ self.assertIsNone(handler.timeout)
+ self.assertFalse(handler.listening)
+ handler.close()
+ self.assertIsNone(handler.db)
+ self.db.reopen()
+ self.assertIsNone(handler.db)
+ handler = f('test2', callback, timeout=2)
+ self.assertIsInstance(handler, pg.NotificationHandler)
+ self.assertIs(handler.db, self.db)
+ self.assertEqual(handler.event, 'test2')
+ self.assertEqual(handler.stop_event, 'stop_test2')
+ self.assertIs(handler.callback, callback)
+ self.assertIsInstance(handler.arg_dict, dict)
+ self.assertEqual(handler.arg_dict, {})
+ self.assertEqual(handler.timeout, 2)
+ self.assertFalse(handler.listening)
+ handler.close()
+ self.assertIsNone(handler.db)
+ self.db.reopen()
+ self.assertIsNone(handler.db)
+ arg_dict = {'testing': 3}
+ handler = f('test3', callback, arg_dict=arg_dict)
+ self.assertIsInstance(handler, pg.NotificationHandler)
+ self.assertIs(handler.db, self.db)
+ self.assertEqual(handler.event, 'test3')
+ self.assertEqual(handler.stop_event, 'stop_test3')
+ self.assertIs(handler.callback, callback)
+ self.assertIs(handler.arg_dict, arg_dict)
+ self.assertEqual(arg_dict['testing'], 3)
+ self.assertIsNone(handler.timeout)
+ self.assertFalse(handler.listening)
+ handler.close()
+ self.assertIsNone(handler.db)
+ self.db.reopen()
+ self.assertIsNone(handler.db)
+ handler = f('test4', callback, stop_event='stop4')
+ self.assertIsInstance(handler, pg.NotificationHandler)
+ self.assertIs(handler.db, self.db)
+ self.assertEqual(handler.event, 'test4')
+ self.assertEqual(handler.stop_event, 'stop4')
+ self.assertIs(handler.callback, callback)
+ self.assertIsInstance(handler.arg_dict, dict)
+ self.assertEqual(handler.arg_dict, {})
+ self.assertIsNone(handler.timeout)
+ self.assertFalse(handler.listening)
+ handler.close()
+ self.assertIsNone(handler.db)
+ self.db.reopen()
+ self.assertIsNone(handler.db)
+ arg_dict = {'testing': 5}
+ handler = f('test5', callback, arg_dict, 1.5, 'stop5')
+ self.assertIsInstance(handler, pg.NotificationHandler)
+ self.assertIs(handler.db, self.db)
+ self.assertEqual(handler.event, 'test5')
+ self.assertEqual(handler.stop_event, 'stop5')
+ self.assertIs(handler.callback, callback)
+ self.assertIs(handler.arg_dict, arg_dict)
+ self.assertEqual(arg_dict['testing'], 5)
+ self.assertEqual(handler.timeout, 1.5)
+ self.assertFalse(handler.listening)
+ handler.close()
+ self.assertIsNone(handler.db)
+ self.db.reopen()
+ self.assertIsNone(handler.db)
+
def testDebugWithCallable(self):
if debug:
self.assertEqual(self.db.debug, debug)
@@ -1455,7 +1552,6 @@
def setUpClass(cls):
db = DB()
query = db.query
- query("set client_min_messages=warning")
for num_schema in range(5):
if num_schema:
schema = "s%d" % num_schema
@@ -1480,7 +1576,6 @@
def tearDownClass(cls):
db = DB()
query = db.query
- query("set client_min_messages=warning")
for num_schema in range(5):
if num_schema:
schema = "s%d" % num_schema
@@ -1493,7 +1588,6 @@
def setUp(self):
self.db = DB()
- self.db.query("set client_min_messages=warning")
def tearDown(self):
self.db.close()
Modified: trunk/docs/contents/changelog.rst
==============================================================================
--- trunk/docs/contents/changelog.rst Sun Jan 17 11:19:32 2016 (r761)
+++ trunk/docs/contents/changelog.rst Sun Jan 17 11:32:18 2016 (r762)
@@ -56,12 +56,14 @@
- get_tables() does not list information schema tables any more.
- Fix notification handler (Thanks Patrick TJ McPhee).
- Fix a small issue with large objects.
+- Minor improvements of the NotificationHandler.
+- Converted documentation to Sphinx and added many missing parts.
- The tutorial files have become a chapter in the documentation.
-- Greatly improve unit testing, tests run with Python 2.4 to 2.7 again.
+- Greatly improved unit testing, tests run with Python 2.4 to 2.7 again.
Version 4.1.1 (2013-01-08)
--------------------------
-- Add WhenNotified class and method. Replaces need for third party pgnotify.
+- Add NotificationHandler class and method. Replaces need for pgnotify.
- Sharpen test for inserting current_timestamp.
- Add more quote tests. False and 0 should evaluate to NULL.
- More tests - Any number other than 0 is True.
Modified: trunk/docs/contents/pg/db_wrapper.rst
==============================================================================
--- trunk/docs/contents/pg/db_wrapper.rst Sun Jan 17 11:19:32 2016
(r761)
+++ trunk/docs/contents/pg/db_wrapper.rst Sun Jan 17 11:32:18 2016
(r762)
@@ -605,3 +605,22 @@
currently regular type names are used.
.. versionadded:: 4.1
+
+notification_handler -- create a notification handler
+-----------------------------------------------------
+
+.. class:: DB.notification_handler(event, callback, [arg_dict], [timeout],
[stop_event])
+
+ Create a notification handler instance
+
+ :param str event: the name of an event to listen for
+ :param callback: a callback function
+ :param dict arg_dict: an optional dictionary for passing arguments
+ :param timeout: the time-out when waiting for notifications
+ :type timeout: int, float or None
+ :param str stop_event: an optional different name to be used as stop event
+
+This method creates a :class:`pg.NotificationHandler` object using the
+:class:`DB` connection as explained under :doc:`notification`.
+
+.. versionadded:: 4.1.1
Modified: trunk/docs/contents/pg/index.rst
==============================================================================
--- trunk/docs/contents/pg/index.rst Sun Jan 17 11:19:32 2016 (r761)
+++ trunk/docs/contents/pg/index.rst Sun Jan 17 11:32:18 2016 (r762)
@@ -14,3 +14,4 @@
db_wrapper
query
large_objects
+ notification
Modified: trunk/pg.py
==============================================================================
--- trunk/pg.py Sun Jan 17 11:19:32 2016 (r761)
+++ trunk/pg.py Sun Jan 17 11:32:18 2016 (r762)
@@ -105,22 +105,28 @@
class NotificationHandler(object):
"""A PostgreSQL client-side asynchronous notification handler."""
- def __init__(self, db, event, callback, arg_dict=None, timeout=None):
+ def __init__(self, db, event, callback=None,
+ arg_dict=None, timeout=None, stop_event=None):
"""Initialize the notification handler.
- db - PostgreSQL connection object.
- event - Event (notification channel) to LISTEN for.
- callback - Event callback function.
- arg_dict - A dictionary passed as the argument to the callback.
- timeout - Timeout in seconds; a floating point number denotes
- fractions of seconds. If it is absent or None, the
- callers will never time out.
+ You must pass a PyGreSQL database connection, the name of an
+ event (notification channel) to listen for and a callback function.
+
+ You can also specify a dictionary arg_dict that will be passed as
+ the single argument to the callback function, and a timeout value
+ in seconds (a floating point number denotes fractions of seconds).
+ If it is absent or None, the callers will never time out. If the
+ timeout is reached, the callback function will be called with a
+ single argument that is None. If you set the timeout to zero,
+ the handler will poll notifications synchronously and return.
+
+ You can specify the name of the event that will be used to signal
+ the handler to stop listening as stop_event. By default, it will
+ be the event name prefixed with 'stop_'.
"""
- if isinstance(db, DB):
- db = db.db
self.db = db
self.event = event
- self.stop_event = 'stop_%s' % event
+ self.stop_event = stop_event or 'stop_%s' % event
self.listening = False
self.callback = callback
if arg_dict is None:
@@ -129,7 +135,7 @@
self.timeout = timeout
def __del__(self):
- self.close()
+ self.unlisten()
def close(self):
"""Stop listening and close the connection."""
@@ -155,38 +161,47 @@
def notify(self, db=None, stop=False, payload=None):
"""Generate a notification.
- Note: If the main loop is running in another thread, you must pass
- a different database connection to avoid a collision.
+ Optionally, you can pass a payload with the notification.
+
+ If you set the stop flag, a stop notification will be sent that
+ will cause the handler to stop listening.
+
+ Note: If the notification handler is running in another thread, you
+ must pass a different database connection since PyGreSQL database
+ connections are not thread-safe.
"""
- if not db:
- db = self.db
if self.listening:
+ if not db:
+ db = self.db
q = 'notify "%s"' % (self.stop_event if stop else self.event)
if payload:
q += ", '%s'" % payload
return db.query(q)
- def __call__(self, close=False):
+ def __call__(self):
"""Invoke the notification handler.
- The handler is a loop that actually LISTENs for two NOTIFY messages:
-
- <event> and stop_<event>.
-
- When either of these NOTIFY messages are received, its associated
- 'pid' and 'event' are inserted into <arg_dict>, and the callback is
- invoked with <arg_dict>. If the NOTIFY message is stop_<event>, the
- handler UNLISTENs both <event> and stop_<event> and exits.
+ The handler is a loop that listens for notifications on the event
+ and stop event channels. When either of these notifications are
+ received, its associated 'pid', 'event' and 'extra' (the payload
+ passed with the notification) are inserted into its arg_dict
+ dictionary and the callback is invoked with this dictionary as
+ a single argument. When the handler receives a stop event, it
+ stops listening to both events and return.
+
+ In the special case that the timeout of the handler has been set
+ to zero, the handler will poll all events synchronously and return.
+ If will keep listening until it receives a stop event.
Note: If you run this loop in another thread, don't use the same
database connection for database operations in the main thread.
"""
self.listen()
- _ilist = [self.db.fileno()]
-
+ poll = self.timeout == 0
+ if not poll:
+ rlist = [self.db.fileno()]
while self.listening:
- ilist, _olist, _elist = select.select(_ilist, [], [], self.timeout)
- if ilist:
+ if poll or select.select(rlist, [], [], self.timeout)[0]:
while self.listening:
notice = self.db.getnotify()
if not notice: # no more messages
@@ -199,10 +214,10 @@
% (self.event, self.stop_event, event))
if event == self.stop_event:
self.unlisten()
- self.arg_dict['pid'] = pid
- self.arg_dict['event'] = event
- self.arg_dict['extra'] = extra
+ self.arg_dict.update(pid=pid, event=event, extra=extra)
self.callback(self.arg_dict)
+ if poll:
+ break
else: # we timed out
self.unlisten()
self.callback(None)
@@ -1143,9 +1158,11 @@
self._do_debug(q)
return self.query(q)
- def notification_handler(self, event, callback, arg_dict={}, timeout=None):
+ def notification_handler(self,
+ event, callback, arg_dict=None, timeout=None, stop_event=None):
"""Get notification handler that will run the given callback."""
- return NotificationHandler(self.db, event, callback, arg_dict, timeout)
+ return NotificationHandler(self,
+ event, callback, arg_dict, timeout, stop_event)
# if run as script, print some information
Modified: trunk/tests/test_classic.py
==============================================================================
--- trunk/tests/test_classic.py Sun Jan 17 11:19:32 2016 (r761)
+++ trunk/tests/test_classic.py Sun Jan 17 11:32:18 2016 (r762)
@@ -231,7 +231,7 @@
break
sleep(0.01)
self.assertTrue(target.listening)
- self.assertTrue(thread.isAlive())
+ self.assertTrue(thread.is_alive())
# Open another connection for sending notifications.
db2 = opendb()
# Generate notification from the other connection.
@@ -259,7 +259,7 @@
self.assertTrue(isinstance(arg_dict['pid'], int))
self.assertFalse(self.notify_timeout)
arg_dict['called'] = False
- self.assertTrue(thread.isAlive())
+ self.assertTrue(thread.is_alive())
# Generate stop notification.
if call_notify:
target.notify(db2, stop=True, payload='payload 2')
@@ -278,7 +278,7 @@
self.assertTrue(isinstance(arg_dict['pid'], int))
self.assertFalse(self.notify_timeout)
thread.join(5)
- self.assertFalse(thread.isAlive())
+ self.assertFalse(thread.is_alive())
self.assertFalse(target.listening)
target.close()
except Exception:
@@ -314,7 +314,7 @@
# Verify that we've indeed timed out.
self.assertFalse(arg_dict.get('called'))
self.assertTrue(self.notify_timeout)
- self.assertFalse(thread.isAlive())
+ self.assertFalse(thread.is_alive())
self.assertFalse(target.listening)
target.close()
Modified: trunk/tests/test_classic_dbwrapper.py
==============================================================================
--- trunk/tests/test_classic_dbwrapper.py Sun Jan 17 11:19:32 2016
(r761)
+++ trunk/tests/test_classic_dbwrapper.py Sun Jan 17 11:32:18 2016
(r762)
@@ -238,11 +238,17 @@
self.assertIs(self.db.db, con)
self.db.query("select 1+1")
self.db.close()
+ self.assertRaises(pg.InternalError, self.db.reset)
def testMethodReopen(self):
con = self.db.db
self.db.reopen()
self.assertIsNot(self.db.db, con)
+ con = self.db.db
+ self.db.query("select 1+1")
+ self.db.close()
+ self.db.reopen()
+ self.assertIsNot(self.db.db, con)
self.db.query("select 1+1")
self.db.close()
@@ -1919,6 +1925,83 @@
self.assertIsInstance(r, bytes)
self.assertEqual(r, s)
+ def testNotificationHandler(self):
+ # the notification handler itself is tested separately
+ f = self.db.notification_handler
+ callback = lambda arg_dict: None
+ handler = f('test', callback)
+ self.assertIsInstance(handler, pg.NotificationHandler)
+ self.assertIs(handler.db, self.db)
+ self.assertEqual(handler.event, 'test')
+ self.assertEqual(handler.stop_event, 'stop_test')
+ self.assertIs(handler.callback, callback)
+ self.assertIsInstance(handler.arg_dict, dict)
+ self.assertEqual(handler.arg_dict, {})
+ self.assertIsNone(handler.timeout)
+ self.assertFalse(handler.listening)
+ handler.close()
+ self.assertIsNone(handler.db)
+ self.db.reopen()
+ self.assertIsNone(handler.db)
+ handler = f('test2', callback, timeout=2)
+ self.assertIsInstance(handler, pg.NotificationHandler)
+ self.assertIs(handler.db, self.db)
+ self.assertEqual(handler.event, 'test2')
+ self.assertEqual(handler.stop_event, 'stop_test2')
+ self.assertIs(handler.callback, callback)
+ self.assertIsInstance(handler.arg_dict, dict)
+ self.assertEqual(handler.arg_dict, {})
+ self.assertEqual(handler.timeout, 2)
+ self.assertFalse(handler.listening)
+ handler.close()
+ self.assertIsNone(handler.db)
+ self.db.reopen()
+ self.assertIsNone(handler.db)
+ arg_dict = {'testing': 3}
+ handler = f('test3', callback, arg_dict=arg_dict)
+ self.assertIsInstance(handler, pg.NotificationHandler)
+ self.assertIs(handler.db, self.db)
+ self.assertEqual(handler.event, 'test3')
+ self.assertEqual(handler.stop_event, 'stop_test3')
+ self.assertIs(handler.callback, callback)
+ self.assertIs(handler.arg_dict, arg_dict)
+ self.assertEqual(arg_dict['testing'], 3)
+ self.assertIsNone(handler.timeout)
+ self.assertFalse(handler.listening)
+ handler.close()
+ self.assertIsNone(handler.db)
+ self.db.reopen()
+ self.assertIsNone(handler.db)
+ handler = f('test4', callback, stop_event='stop4')
+ self.assertIsInstance(handler, pg.NotificationHandler)
+ self.assertIs(handler.db, self.db)
+ self.assertEqual(handler.event, 'test4')
+ self.assertEqual(handler.stop_event, 'stop4')
+ self.assertIs(handler.callback, callback)
+ self.assertIsInstance(handler.arg_dict, dict)
+ self.assertEqual(handler.arg_dict, {})
+ self.assertIsNone(handler.timeout)
+ self.assertFalse(handler.listening)
+ handler.close()
+ self.assertIsNone(handler.db)
+ self.db.reopen()
+ self.assertIsNone(handler.db)
+ arg_dict = {'testing': 5}
+ handler = f('test5', callback, arg_dict, 1.5, 'stop5')
+ self.assertIsInstance(handler, pg.NotificationHandler)
+ self.assertIs(handler.db, self.db)
+ self.assertEqual(handler.event, 'test5')
+ self.assertEqual(handler.stop_event, 'stop5')
+ self.assertIs(handler.callback, callback)
+ self.assertIs(handler.arg_dict, arg_dict)
+ self.assertEqual(arg_dict['testing'], 5)
+ self.assertEqual(handler.timeout, 1.5)
+ self.assertFalse(handler.listening)
+ handler.close()
+ self.assertIsNone(handler.db)
+ self.db.reopen()
+ self.assertIsNone(handler.db)
+
class TestDBClassNonStdOpts(TestDBClass):
"""Test the methods of the DB class with non-standard global options."""
@@ -1957,7 +2040,6 @@
def setUpClass(cls):
db = DB()
query = db.query
- query("set client_min_messages=warning")
for num_schema in range(5):
if num_schema:
schema = "s%d" % num_schema
@@ -1982,7 +2064,6 @@
def tearDownClass(cls):
db = DB()
query = db.query
- query("set client_min_messages=warning")
for num_schema in range(5):
if num_schema:
schema = "s%d" % num_schema
@@ -1995,7 +2076,6 @@
def setUp(self):
self.db = DB()
- self.db.query("set client_min_messages=warning")
def tearDown(self):
self.doCleanups()
_______________________________________________
PyGreSQL mailing list
[email protected]
https://mail.vex.net/mailman/listinfo.cgi/pygresql