Author: cito
Date: Mon Jan 7 16:10:55 2013
New Revision: 511
Log:
Simplify and rename notification handler again.
Modified:
trunk/module/TEST_PyGreSQL_classic.py
trunk/module/pg.py
Modified: trunk/module/TEST_PyGreSQL_classic.py
==============================================================================
--- trunk/module/TEST_PyGreSQL_classic.py Mon Jan 7 03:01:28 2013
(r510)
+++ trunk/module/TEST_PyGreSQL_classic.py Mon Jan 7 16:10:55 2013
(r511)
@@ -239,8 +239,8 @@
for call_notify in False, True:
db = opendb()
# Get function under test, can be standalone or DB method.
- fut = db.when_notified if run_as_method else partial(
- WhenNotified, db)
+ fut = db.notification_handler if run_as_method else partial(
+ NotificationHandler, db)
arg_dict = dict(event=None, called=False)
self.notify_timeout = False
# Listen for 'event_1'.
@@ -254,13 +254,12 @@
sleep(0.01)
self.assertTrue(target.listening)
self.assertTrue(thread.isAlive())
- # Generate notification.
+ # Open another connection for sending notifications.
+ db2 = opendb()
+ # Generate notification from the other connection.
if call_notify:
- target.notify(payload='payload 1')
+ target.notify(db2, payload='payload 1')
else:
- # Open another connection for sending notifications.
- db2 = opendb()
- # Generate notification from the other connection.
db2.query("notify event_1, 'payload 1'")
# Wait until the notification has been caught.
for n in xrange(500):
@@ -277,10 +276,10 @@
self.assertTrue(thread.isAlive())
# Generate stop notification.
if call_notify:
- target.notify(stop=True, payload='payload 2')
+ target.notify(db2, stop=True, payload='payload 2')
else:
db2.query("notify stop_event_1, 'payload 2'")
- db2.close()
+ db2.close()
# Wait until the notification has been caught.
for n in xrange(500):
if arg_dict['called'] or self.notify_timeout:
@@ -301,8 +300,8 @@
for run_as_method in False, True:
db = opendb()
# Get function under test, can be standalone or DB method.
- fut = db.when_notified if run_as_method else partial(
- WhenNotified, db)
+ fut = db.notification_handler if run_as_method else partial(
+ NotificationHandler, db)
arg_dict = dict(event=None, called=False)
self.notify_timeout = False
# Listen for 'event_1' with timeout of 10ms.
Modified: trunk/module/pg.py
==============================================================================
--- trunk/module/pg.py Mon Jan 7 03:01:28 2013 (r510)
+++ trunk/module/pg.py Mon Jan 7 16:10:55 2013 (r511)
@@ -32,7 +32,6 @@
import select
import warnings
-from threading import Lock
try:
frozenset
except NameError: # Python < 2.4
@@ -141,76 +140,74 @@
return _db_error(msg, ProgrammingError)
-class WhenNotified(object):
+class NotificationHandler(object):
"""A PostgreSQL client-side asynchronous notification handler."""
def __init__(self, db, event, callback, arg_dict=None, timeout=None):
"""Initialize the notification handler.
- db - PostgreSQL connection object.
- event - Event to LISTEN for.
- callback - Event callback.
+ db - PostgreSQL connection object.
+ event - Event (notification channel) to LISTEN for.
+ callback - Event callback function.
arg_dict - A dictionary passed as the argument to the callback.
timeout - Timeout in seconds; a floating point number denotes
- fractions of seconds. If it is absent or None, the
- callers will never time out."""
+ fractions of seconds. If it is absent or None, the
+ callers will never time out.
+ """
if isinstance(db, DB):
db = db.db
self.db = db
self.event = event
- self.stop = 'stop_%s' % event
+ self.stop_event = 'stop_%s' % event
self.listening = False
self.callback = callback
if arg_dict is None:
arg_dict = {}
self.arg_dict = arg_dict
self.timeout = timeout
- self.lock = Lock()
def __del__(self):
self.close()
def close(self):
+ """Stop listening and close the connection."""
if self.db:
self.unlisten()
self.db.close()
self.db = None
def listen(self):
+ """Start listening for the event and the stop event."""
if not self.listening:
- self.lock.acquire()
- try:
- self.db.query('listen "%s"' % self.event)
- self.db.query('listen "%s"' % self.stop)
- self.listening = True
- finally:
- self.lock.release()
+ self.db.query('listen "%s"' % self.event)
+ self.db.query('listen "%s"' % self.stop_event)
+ self.listening = True
def unlisten(self):
+ """Stop listening for the event and the stop event."""
if self.listening:
- self.lock.acquire()
- try:
- self.db.query('unlisten "%s"' % self.event)
- self.db.query('unlisten "%s"' % self.stop)
- self.listening = False
- finally:
- self.lock.release()
+ self.db.query('unlisten "%s"' % self.event)
+ self.db.query('unlisten "%s"' % self.stop_event)
+ self.listening = False
+
+ def notify(self, db=None, stop=False, payload=None):
+ """Generate a notification.
+
+ Note: If the main loop is running in another thread, you must pass
+ a different database connection to avoid a collision.
- def notify(self, stop=False, payload=None):
+ """
+ if not db:
+ db = self.db
if self.listening:
- q = 'notify "%s"' % (stop and self.stop or self.event)
+ q = 'notify "%s"' % (stop and self.stop_event or self.event)
if payload:
q += ", '%s'" % payload
- self.lock.acquire()
- try:
- ret = self.db.query(q)
- finally:
- self.lock.release()
- return ret
+ return db.query(q)
- def __call__(self):
- """Invoke the handler.
+ def __call__(self, close=False):
+ """Invoke the notification handler.
The handler is a loop that actually LISTENs for two NOTIFY messages:
@@ -221,6 +218,9 @@
invoked with <arg_dict>. If the NOTIFY message is stop_<event>, the
handler UNLISTENs both <event> and stop_<event> and exits.
+ Note: If you run this loop in another thread, don't use the same
+ database connection for database operations in the main thread.
+
"""
self.listen()
_ilist = [self.db.fileno()]
@@ -232,34 +232,30 @@
self.callback(None)
break
else:
- self.lock.acquire()
- try:
- notice = self.db.getnotify()
- finally:
- self.lock.release()
+ notice = self.db.getnotify()
if notice is None:
continue
event, pid, extra = notice
- if event in (self.event, self.stop):
+ if event in (self.event, self.stop_event):
self.arg_dict['pid'] = pid
self.arg_dict['event'] = event
self.arg_dict['extra'] = extra
self.callback(self.arg_dict)
- if event == self.stop:
+ if event == self.stop_event:
self.unlisten()
break
else:
self.unlisten()
raise _db_error(
'listening for "%s" and "%s", but notified of "%s"'
- % (self.event, self.stop, event))
+ % (self.event, self.stop_event, event))
def pgnotify(*args, **kw):
- """Same as WhenNotified, under the traditional name."""
- warnings.warn("pgnotify is deprecated, use WhenNotified instead.",
+ """Same as NotificationHandler, under the traditional name."""
+ warnings.warn("pgnotify is deprecated, use NotificationHandler instead.",
DeprecationWarning, stacklevel=2)
- return WhenNotified(*args, **kw)
+ return NotificationHandler(*args, **kw)
# The actual PostGreSQL database connection interface:
@@ -963,9 +959,9 @@
self._do_debug(q)
return int(self.db.query(q))
- def when_notified(self, event, callback, arg_dict={}, timeout=None):
+ def notification_handler(self, event, callback, arg_dict={}, timeout=None):
"""Get notification handler that will run the given callback."""
- return WhenNotified(self.db, event, callback, arg_dict, timeout)
+ return NotificationHandler(self.db, event, callback, arg_dict, timeout)
# if run as script, print some information
_______________________________________________
PyGreSQL mailing list
[email protected]
https://mail.vex.net/mailman/listinfo.cgi/pygresql